summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ClockTAI.cpp562
-rw-r--r--src/ClockTAI.h102
-rw-r--r--src/ConfigParser.cpp85
-rw-r--r--src/DabMultiplexer.cpp96
-rw-r--r--src/DabMultiplexer.h11
-rw-r--r--src/DabMux.cpp39
-rw-r--r--src/InetAddress.cpp155
-rw-r--r--src/InetAddress.h78
-rw-r--r--src/Log.cpp143
-rw-r--r--src/Log.h157
-rw-r--r--src/ManagementServer.cpp20
-rw-r--r--src/ManagementServer.h4
-rw-r--r--src/MuxElements.cpp11
-rw-r--r--src/MuxElements.h3
-rw-r--r--src/ReedSolomon.cpp116
-rw-r--r--src/ReedSolomon.h56
-rw-r--r--src/RemoteControl.cpp595
-rw-r--r--src/RemoteControl.h263
-rw-r--r--src/TcpSocket.cpp359
-rw-r--r--src/TcpSocket.h164
-rw-r--r--src/ThreadsafeQueue.h178
-rw-r--r--src/UdpSocket.cpp256
-rw-r--r--src/UdpSocket.h174
-rw-r--r--src/crc.c266
-rw-r--r--src/crc.h59
-rw-r--r--src/dabOutput/dabOutput.h21
-rw-r--r--src/dabOutput/dabOutputTcp.cpp2
-rw-r--r--src/dabOutput/dabOutputUdp.cpp65
-rw-r--r--src/dabOutput/edi/AFPacket.cpp96
-rw-r--r--src/dabOutput/edi/AFPacket.h61
-rw-r--r--src/dabOutput/edi/Config.h77
-rw-r--r--src/dabOutput/edi/Interleaver.cpp122
-rw-r--r--src/dabOutput/edi/Interleaver.h75
-rw-r--r--src/dabOutput/edi/PFT.cpp327
-rw-r--r--src/dabOutput/edi/PFT.h78
-rw-r--r--src/dabOutput/edi/TagItems.cpp216
-rw-r--r--src/dabOutput/edi/TagItems.h149
-rw-r--r--src/dabOutput/edi/TagPacket.cpp79
-rw-r--r--src/dabOutput/edi/TagPacket.h56
-rw-r--r--src/dabOutput/edi/Transport.cpp182
-rw-r--r--src/dabOutput/edi/Transport.h69
-rw-r--r--src/dabOutput/metadata.h2
-rw-r--r--src/fig/FIG0_19.cpp13
-rw-r--r--src/input/Edi.cpp427
-rw-r--r--src/input/Edi.h126
-rw-r--r--src/input/File.cpp37
-rw-r--r--src/input/File.h13
-rw-r--r--src/input/Prbs.cpp18
-rw-r--r--src/input/Prbs.h7
-rw-r--r--src/input/Udp.cpp89
-rw-r--r--src/input/Udp.h15
-rw-r--r--src/input/Zmq.cpp34
-rw-r--r--src/input/Zmq.h13
-rw-r--r--src/input/inputs.h54
-rw-r--r--src/utils.cpp3
-rw-r--r--src/zmq2edi/EDISender.cpp2
-rw-r--r--src/zmq2edi/EDISender.h6
-rw-r--r--src/zmq2edi/zmq2edi.cpp168
-rw-r--r--src/zmq2farsync/zmq2farsync.cpp115
59 files changed, 1111 insertions, 5658 deletions
diff --git a/src/ClockTAI.cpp b/src/ClockTAI.cpp
deleted file mode 100644
index c376c07..0000000
--- a/src/ClockTAI.cpp
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
- 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
- Research Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-/* This file downloads the TAI-UTC bulletins from the from IETF and parses them
- * so that correct time can be communicated in EDI timestamps.
- *
- * This file contains self-test code that can be executed by running
- * g++ -g -Wall -DTEST -DHAVE_CURL -std=c++11 -lcurl -pthread \
- * ClockTAI.cpp Log.cpp RemoteControl.cpp -lboost_system -o taitest && ./taitest
- */
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "ClockTAI.h"
-#include "Log.h"
-
-#include <time.h>
-#include <stdio.h>
-#include <errno.h>
-#if SUPPORT_SETTING_CLOCK_TAI
-# include <sys/timex.h>
-#endif
-#ifdef HAVE_CURL
-# include <curl/curl.h>
-#endif
-#include <array>
-#include <string>
-#include <iostream>
-#include <algorithm>
-#include <regex>
-
-using namespace std;
-
-#ifdef TEST
-static bool wait_longer = true;
-#endif
-
-constexpr int download_retry_interval_hours = 1;
-
-// Offset between NTP time and POSIX time:
-// timestamp_unix = timestamp_ntp - ntp_unix_offset
-const int64_t ntp_unix_offset = 2208988800L;
-
-// leap seconds insertion bulletin is available from the IETF and in the TZ
-// distribution
-static array<const char*, 2> default_tai_urls = {
- "https://www.ietf.org/timezones/data/leap-seconds.list",
- "https://raw.githubusercontent.com/eggert/tz/master/leap-seconds.list",
-};
-
-// According to the Filesystem Hierarchy Standard, the data in
-// /var/tmp "must not be deleted when the system is booted."
-static const char *tai_cache_location = "/var/tmp/odr-dabmux-leap-seconds.cache";
-
-// read TAI offset from a valid bulletin in IETF format
-static int parse_ietf_bulletin(const std::string& bulletin)
-{
- // Example Line:
- // 3692217600 37 # 1 Jan 2017
- //
- // NTP timestamp<TAB>leap seconds<TAB># some comment
- // The NTP timestamp starts at epoch 1.1.1900.
- // The difference between NTP timestamps and unix epoch is 70
- // years i.e. 2208988800 seconds
-
- std::regex regex_bulletin(R"(([0-9]+)\s+([0-9]+)\s+#.*)");
-
- time_t now = time(nullptr);
-
- int tai_utc_offset = 0;
-
- int tai_utc_offset_valid = false;
-
- stringstream ss(bulletin);
-
- /* We cannot just take the last line, because it might
- * be in the future, announcing an upcoming leap second.
- *
- * So we need to look at the current date, and compare it
- * with the date of the leap second.
- */
- for (string line; getline(ss, line); ) {
-
- std::smatch bulletin_entry;
-
- bool is_match = std::regex_search(line, bulletin_entry, regex_bulletin);
- if (is_match) {
- if (bulletin_entry.size() != 3) {
- throw runtime_error(
- "Incorrect number of matched TAI IETF bulletin entries");
- }
- const string bulletin_ntp_timestamp(bulletin_entry[1]);
- const string bulletin_offset(bulletin_entry[2]);
-
- const int64_t timestamp_unix =
- std::atoll(bulletin_ntp_timestamp.c_str()) - ntp_unix_offset;
-
- const int offset = std::atoi(bulletin_offset.c_str());
- // Ignore entries announcing leap seconds in the future
- if (timestamp_unix < now) {
- tai_utc_offset = offset;
- tai_utc_offset_valid = true;
- }
-#if TEST
- else {
- cerr << "IETF Ignoring offset " << bulletin_offset <<
- " at TS " << bulletin_ntp_timestamp <<
- " in the future" << endl;
- }
-#endif
- }
- }
-
- if (not tai_utc_offset_valid) {
- throw runtime_error("No data in TAI bulletin");
- }
-
- return tai_utc_offset;
-}
-
-
-struct bulletin_state {
- bool valid = false;
- int64_t expiry = 0;
- int offset = 0;
-
- bool usable() const { return valid and expiry > 0; }
-};
-
-static bulletin_state parse_bulletin(const string& bulletin)
-{
- // The bulletin contains one line that specifies an expiration date
- // in NTP time. If that point in time is in the future, we consider
- // the bulletin valid.
- //
- // The entry looks like this:
- //#@ 3707596800
-
- bulletin_state ret;
-
- std::regex regex_expiration(R"(#@\s+([0-9]+))");
-
- time_t now = time(nullptr);
-
- stringstream ss(bulletin);
-
- for (string line; getline(ss, line); ) {
- std::smatch bulletin_entry;
-
- bool is_match = std::regex_search(line, bulletin_entry, regex_expiration);
- if (is_match) {
- if (bulletin_entry.size() != 2) {
- throw runtime_error(
- "Incorrect number of matched TAI IETF bulletin expiration");
- }
- const string expiry_data_str(bulletin_entry[1]);
- const int64_t expiry_unix =
- std::atoll(expiry_data_str.c_str()) - ntp_unix_offset;
-
-#ifdef TEST
- etiLog.level(info) << "Bulletin expires in " << expiry_unix - now;
-#endif
- ret.expiry = expiry_unix - now;
- try {
- ret.offset = parse_ietf_bulletin(bulletin);
- ret.valid = true;
- }
- catch (const runtime_error& e) {
- etiLog.level(warn) << "Bulletin expiry ok but parse error: " << e.what();
- }
- break;
- }
- }
- return ret;
-}
-
-
-// callback that receives data from cURL
-static size_t fill_bulletin(char *ptr, size_t size, size_t nmemb, void *ctx)
-{
- auto *bulletin = reinterpret_cast<stringstream*>(ctx);
-
- size_t len = size * nmemb;
- for (size_t i = 0; i < len; i++) {
- *bulletin << ptr[i];
- }
- return len;
-}
-
-static string download_tai_utc_bulletin(const char* url)
-{
- stringstream bulletin;
-
-#ifdef HAVE_CURL
- CURL *curl;
- CURLcode res;
-
- curl = curl_easy_init();
- if (curl) {
- curl_easy_setopt(curl, CURLOPT_URL, url);
- /* Tell libcurl to follow redirection */
- curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
- curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, fill_bulletin);
- curl_easy_setopt(curl, CURLOPT_WRITEDATA, &bulletin);
-
- res = curl_easy_perform(curl);
- /* always cleanup ! */
- curl_easy_cleanup(curl);
-
- if (res != CURLE_OK) {
- throw runtime_error( "TAI-UTC bulletin download failed: " +
- string(curl_easy_strerror(res)));
- }
- }
- return bulletin.str();
-#else
- throw runtime_error("Cannot download TAI Clock information without cURL");
-#endif // HAVE_CURL
-}
-
-static string load_bulletin_from_file(const char* cache_filename)
-{
- // Clear the bulletin
- ifstream f(cache_filename);
- if (not f.good()) {
- return {};
- }
-
- stringstream ss;
- ss << f.rdbuf();
- f.close();
-
- return ss.str();
-}
-
-ClockTAI::ClockTAI(const std::vector<std::string>& bulletin_urls) :
- RemoteControllable("clocktai")
-{
- RC_ADD_PARAMETER(expiry, "Number of seconds until TAI Bulletin expires");
-
- if (bulletin_urls.empty()) {
- etiLog.level(debug) << "Initialising default TAI Bulletin URLs";
- for (const auto url : default_tai_urls) {
- m_bulletin_urls.push_back(url);
- }
- }
- else {
- etiLog.level(debug) << "Initialising user-configured TAI Bulletin URLs";
- m_bulletin_urls = bulletin_urls;
- }
-
- for (const auto url : m_bulletin_urls) {
- etiLog.level(info) << "TAI Bulletin URL: '" << url << "'";
- }
-}
-
-int ClockTAI::get_valid_offset()
-{
- int offset = 0;
- bool offset_valid = false;
-
- std::unique_lock<std::mutex> lock(m_data_mutex);
-
- const auto state = parse_bulletin(m_bulletin);
- if (state.usable()) {
-#if TEST
- etiLog.level(info) << "Bulletin already valid";
-#endif
- offset = state.offset;
- offset_valid = true;
- }
- else {
- const auto cache_bulletin = load_bulletin_from_file(tai_cache_location);
- const auto cache_state = parse_bulletin(cache_bulletin);
-
- if (cache_state.usable()) {
- m_bulletin = cache_bulletin;
- offset = cache_state.offset;
- offset_valid = true;
-#if TEST
- etiLog.level(info) << "Bulletin from cache valid with offset=" << offset;
-#endif
- }
- else {
- for (const auto url : m_bulletin_urls) {
- try {
-#if TEST
- etiLog.level(info) << "Load bulletin from " << url;
-#endif
- const auto new_bulletin = download_tai_utc_bulletin(url.c_str());
- const auto new_state = parse_bulletin(new_bulletin);
- if (new_state.usable()) {
- m_bulletin = new_bulletin;
- offset = new_state.offset;
- offset_valid = true;
-
- etiLog.level(debug) << "Loaded valid TAI Bulletin from " <<
- url << " giving offset=" << offset;
- }
- else {
- etiLog.level(debug) << "Skipping invalid TAI bulletin from "
- << url;
- }
- }
- catch (const runtime_error& e) {
- etiLog.level(warn) <<
- "TAI-UTC offset could not be retrieved from " <<
- url << " : " << e.what();
- }
-
- if (offset_valid) {
- update_cache(tai_cache_location);
- break;
- }
- }
- }
- }
-
- if (offset_valid) {
- // With the current evolution of the offset, we're probably going
- // to reach 500 long after DAB gets replaced by another standard.
- if (offset < 0 or offset > 500) {
- stringstream ss;
- ss << "TAI offset " << offset << " out of range";
- throw range_error(ss.str());
- }
-
- return offset;
- }
- else {
- // Try again later
- throw download_failed();
- }
-}
-
-
-int ClockTAI::get_offset()
-{
- using namespace std::chrono;
- const auto time_now = system_clock::now();
-
- std::unique_lock<std::mutex> lock(m_data_mutex);
-
- if (not m_offset_valid) {
-#ifdef TEST
- // Assume we've downloaded it in the past:
-
- m_offset = 37; // Valid in early 2017
- m_offset_valid = true;
-
- // Simulate requiring a new download
- m_bulletin_download_time = time_now - hours(24 * 40);
-#else
- // First time we run we must block until we know
- // the offset
- lock.unlock();
- try {
- m_offset = get_valid_offset();
- }
- catch (const download_failed&) {
- throw runtime_error("Unable to download TAI bulletin");
- }
- lock.lock();
- m_offset_valid = true;
- m_bulletin_download_time = time_now;
-#endif
- etiLog.level(info) <<
- "Initialised TAI-UTC offset to " << m_offset << "s.";
- }
-
- if (time_now - m_bulletin_download_time > hours(24 * 31)) {
- // Refresh if it's older than one month. Leap seconds are
- // announced several months in advance
- etiLog.level(debug) << "Trying to refresh TAI bulletin";
-
- if (m_offset_future.valid()) {
- auto state = m_offset_future.wait_for(seconds(0));
- switch (state) {
- case future_status::ready:
- try {
- m_offset = m_offset_future.get();
- m_offset_valid = true;
- m_bulletin_download_time = time_now;
-
- etiLog.level(info) <<
- "Updated TAI-UTC offset to " << m_offset << "s.";
- }
- catch (const download_failed&) {
- etiLog.level(warn) <<
- "TAI-UTC download failed, will retry in " <<
- download_retry_interval_hours << " hour(s)";
-
- m_bulletin_download_time += hours(download_retry_interval_hours);
- }
-#ifdef TEST
- wait_longer = false;
-#endif
- break;
-
- case future_status::deferred:
- case future_status::timeout:
- // Not ready yet
-#ifdef TEST
- etiLog.level(debug) << " async not ready yet";
-#endif
- break;
- }
- }
- else {
-#ifdef TEST
- etiLog.level(debug) << " Launch async";
-#endif
- m_offset_future = async(launch::async, &ClockTAI::get_valid_offset, this);
- }
- }
-
- return m_offset;
-}
-
-#if SUPPORT_SETTING_CLOCK_TAI
-int ClockTAI::update_local_tai_clock(int offset)
-{
- struct timex timex_request;
- timex_request.modes = ADJ_TAI;
- timex_request.constant = offset;
-
- int err = adjtimex(&timex_request);
- if (err == -1) {
- perror("adjtimex");
- }
-
- printf("adjtimex: %d, tai %d\n", err, timex_request.tai);
-
- return err;
-}
-#endif
-
-void ClockTAI::update_cache(const char* cache_filename)
-{
- ofstream f(cache_filename);
- if (not f.good()) {
- throw runtime_error("TAI-UTC bulletin open cache for writing");
- }
-
- f << m_bulletin;
- f.close();
-}
-
-
-void ClockTAI::set_parameter(const string& parameter, const string& value)
-{
- if (parameter == "expiry") {
- throw ParameterError("Parameter '" + parameter +
- "' is not read-only in controllable " + get_rc_name());
- }
- else {
- throw ParameterError("Parameter '" + parameter +
- "' is not exported by controllable " + get_rc_name());
- }
-}
-
-const string ClockTAI::get_parameter(const string& parameter) const
-{
- if (parameter == "expiry") {
- std::unique_lock<std::mutex> lock(m_data_mutex);
- const int64_t expiry = parse_bulletin(m_bulletin).expiry;
- if (expiry > 0) {
- return to_string(expiry);
- }
- else {
- return "Bulletin expired or invalid!";
- }
- }
- else {
- throw ParameterError("Parameter '" + parameter +
- "' is not exported by controllable " + get_rc_name());
- }
-}
-
-#if 0
-// Example testing code
-void debug_tai_clk()
-{
- struct timespec rt_clk;
-
- int err = clock_gettime(CLOCK_REALTIME, &rt_clk);
- if (err) {
- perror("REALTIME clock_gettime failed");
- }
-
- struct timespec tai_clk;
-
- err = clock_gettime(CLOCK_TAI, &tai_clk);
- if (err) {
- perror("TAI clock_gettime failed");
- }
-
- printf("RT - TAI = %ld\n", rt_clk.tv_sec - tai_clk.tv_sec);
-
-
- struct timex timex_request;
- timex_request.modes = 0; // Do not set anything
-
- err = adjtimex(&timex_request);
- if (err == -1) {
- perror("adjtimex");
- }
-
- printf("adjtimex: %d, tai %d\n", err, timex_request.tai);
-}
-#endif
-
-#if TEST
-int main(int argc, char **argv)
-{
- using namespace std;
-
- ClockTAI tai({});
-
- while (wait_longer) {
- try {
- etiLog.level(info) <<
- "Offset is " << tai.get_offset();
- }
- catch (const exception &e) {
- etiLog.level(error) <<
- "Exception " << e.what();
- }
-
- this_thread::sleep_for(chrono::seconds(2));
- }
-
- return 0;
-}
-#endif
-
diff --git a/src/ClockTAI.h b/src/ClockTAI.h
deleted file mode 100644
index 4b3c2ff..0000000
--- a/src/ClockTAI.h
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
- 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
- Research Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-/* The EDI output needs TAI clock, according to ETSI TS 102 693 Annex F
- * "EDI Timestamps". This module can set the local CLOCK_TAI clock by
- * setting the TAI-UTC offset using adjtimex.
- *
- * This functionality requires Linux 3.10 (30 Jun 2013) or newer.
- */
-
-#pragma once
-
-#include <stdint.h>
-#include <stdlib.h>
-#include <sstream>
-#include <chrono>
-#include <future>
-#include <mutex>
-#include <string>
-#include <vector>
-#include "RemoteControl.h"
-
-// EDI needs to know UTC-TAI, but doesn't need the CLOCK_TAI to be set.
-// We can keep this code, maybe for future use
-#define SUPPORT_SETTING_CLOCK_TAI 0
-
-/* Loads, parses and represents TAI-UTC offset information from the IETF bulletin */
-class ClockTAI : public RemoteControllable {
- public:
- ClockTAI(const std::vector<std::string>& bulletin_urls);
-
- // Fetch the bulletin from the IETF website and return the current
- // TAI-UTC offset.
- // Throws runtime_error on failure.
- int get_offset(void);
-
-#if SUPPORT_SETTING_CLOCK_TAI
- // Update the local TAI clock according to the TAI-UTC offset
- // return 0 on success
- int update_local_tai_clock(int offset);
-#endif
-
- private:
- class download_failed {};
-
- // Either retrieve the bulletin from the cache or if necessarly
- // download it, and calculate the TAI-UTC offset.
- // Returns the offset or throws download_failed or a range_error
- // if the offset is out of bounds.
- int get_valid_offset(void);
-
- // Download of new bulletin is done asynchronously
- std::future<int> m_offset_future;
-
- // Protect all data members, as RC functions are in another thread
- mutable std::mutex m_data_mutex;
-
- // The currently used TAI-UTC offset
- int m_offset = 0;
- int m_offset_valid = false;
-
- std::vector<std::string> m_bulletin_urls;
-
- std::string m_bulletin;
- std::chrono::system_clock::time_point m_bulletin_download_time;
-
- // Update the cache file with the current m_bulletin
- void update_cache(const char* cache_filename);
-
-
- /* Remote control */
- virtual void set_parameter(const std::string& parameter,
- const std::string& value);
-
- /* Getting a parameter always returns a string. */
- virtual const std::string get_parameter(const std::string& parameter) const;
-};
-
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index fb49efc..776ddc8 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -40,6 +40,7 @@
#include "utils.h"
#include "DabMux.h"
#include "ManagementServer.h"
+#include "input/Edi.h"
#include "input/Prbs.h"
#include "input/Zmq.h"
#include "input/File.h"
@@ -50,11 +51,12 @@
#include <boost/algorithm/string/split.hpp>
#include <cstdint>
#include <cstring>
-#include <memory>
+#include <chrono>
#include <exception>
#include <iostream>
-#include <string>
#include <map>
+#include <memory>
+#include <string>
#include <vector>
using namespace std;
@@ -876,34 +878,46 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
type = pt.get<string>("type");
}
catch (const ptree_error &e) {
- stringstream ss;
- ss << "Subchannel with uid " << subchanuid << " has no type defined!";
- throw runtime_error(ss.str());
+ throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!");
}
- /* Both inputfile and inputuri are supported, and are equivalent.
- * inputuri has precedence
+ /* Up to v2.3.1, both inputfile and inputuri are supported, and are
+ * equivalent. inputuri has precedence.
+ *
+ * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both.
*/
string inputUri = pt.get<string>("inputuri", "");
+ string proto = pt.get<string>("inputproto", "");
- if (inputUri == "") {
+ if (inputUri.empty() and proto.empty()) {
try {
+ /* Old approach, derives proto from scheme used in the URL.
+ * This makes it impossible to distinguish between ZMQ tcp:// and
+ * EDI tcp://
+ */
inputUri = pt.get<string>("inputfile");
+ size_t protopos = inputUri.find("://");
+
+ if (protopos == string::npos) {
+ proto = "file";
+ }
+ else {
+ proto = inputUri.substr(0, protopos);
+
+ if (proto == "tcp" or proto == "epgm" or proto == "ipc") {
+ proto = "zmq";
+ }
+ else if (proto == "sti-rtp") {
+ proto = "sti";
+ }
+ }
}
catch (const ptree_error &e) {
- stringstream ss;
- ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!";
- throw runtime_error(ss.str());
+ throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!");
}
}
-
- string proto;
- size_t protopos = inputUri.find("://");
- if (protopos == string::npos) {
- proto = "file";
- }
- else {
- proto = inputUri.substr(0, protopos);
+ else if (inputUri.empty() or proto.empty()) {
+ throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid);
}
subchan->inputUri = inputUri;
@@ -928,7 +942,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
throw logic_error("Incomplete handling of file input");
}
}
- else if (proto == "tcp" or proto == "epgm" or proto == "ipc") {
+ else if (proto == "zmq") {
auto zmqconfig = setup_zmq_input(pt, subchanuid);
if (type == "audio") {
@@ -941,15 +955,16 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
rcs.enrol(inzmq.get());
subchan->input = inzmq;
}
-
- if (proto == "epgm") {
- etiLog.level(warn) << "Using untested epgm:// zeromq input";
- }
- else if (proto == "ipc") {
- etiLog.level(warn) << "Using untested ipc:// zeromq input";
- }
}
- else if (proto == "sti-rtp") {
+ else if (proto == "edi") {
+ Inputs::dab_input_edi_config_t config;
+ config.buffer_size = pt.get("buffer", config.buffer_size);
+ config.prebuffering = pt.get("prebuffering", config.prebuffering);
+ auto inedi = make_shared<Inputs::Edi>(subchanuid, config);
+ rcs.enrol(inedi.get());
+ subchan->input = inedi;
+ }
+ else if (proto == "stp") {
subchan->input = make_shared<Inputs::Sti_d_Rtp>();
}
else {
@@ -1012,6 +1027,20 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
}
}
+ const string bufferManagement = pt.get("buffer-management", "prebuffering");
+ if (bufferManagement == "prebuffering") {
+ subchan->input->setBufferManagement(Inputs::BufferManagement::Prebuffering);
+ }
+ else if (bufferManagement == "timestamped") {
+ subchan->input->setBufferManagement(Inputs::BufferManagement::Timestamped);
+ }
+ else {
+ throw runtime_error("Subchannel with uid " + subchanuid + " has invalid buffer-management !");
+ }
+
+ const int32_t tist_delay = pt.get("tist-delay", 0);
+ subchan->input->setTistDelay(chrono::milliseconds(tist_delay));
+
subchan->startAddress = 0;
dabProtection* protection = &subchan->protection;
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index 9ff28a3..2bd8d74 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -134,24 +134,23 @@ void DabMultiplexer::prepare(bool require_tai_clock)
throw MuxInitException();
}
- /* TODO:
- * In a SFN, when reconfiguring the ensemble, the multiplexer
- * has to be restarted (odr-dabmux doesn't support reconfiguration).
- * Ideally, we must be able to restart transmission s.t. the receiver
- * synchronisation is preserved.
+ /* Ensure edi_time and TIST represent current time. Keep
+ * a granularity of 24ms, which corresponds to the
+ * duration of an ETI frame, to get nicer timestamps.
*/
using Sec = chrono::seconds;
- const auto now = chrono::time_point_cast<Sec>(chrono::system_clock::now());
-
- edi_time = chrono::system_clock::to_time_t(now);
-
- // We define that when the time is multiple of six seconds, the timestamp
- // (PPS offset) is 0. This ensures consistency of TIST even across a
- // mux restart
+ const auto now = chrono::system_clock::now();
+ edi_time = chrono::system_clock::to_time_t(chrono::time_point_cast<Sec>(now));
+ auto offset = now - chrono::time_point_cast<Sec>(now);
+ if (offset >= chrono::seconds(1)) {
+ throw std::logic_error("Invalid startup offset calculation for TIST! " +
+ to_string(chrono::duration_cast<chrono::milliseconds>(offset).count()) +
+ " ms");
+ }
timestamp = 0;
- edi_time -= (edi_time % 6);
- while (edi_time < chrono::system_clock::to_time_t(now)) {
+ while (offset >= chrono::milliseconds(24)) {
increment_timestamp();
+ offset -= chrono::milliseconds(24);
}
// Try to load offset once
@@ -284,13 +283,13 @@ void DabMultiplexer::prepare_services_components()
component->subchId, component->serviceId);
throw MuxInitException();
}
- if ((*subchannel)->type != subchannel_type_t::Packet) continue;
- component->packet.id = cur_packetid++;
+ if ((*subchannel)->type == subchannel_type_t::Packet) {
+ component->packet.id = cur_packetid++;
+ }
rcs.enrol(component.get());
}
-
}
void DabMultiplexer::prepare_data_inputs()
@@ -308,10 +307,8 @@ void DabMultiplexer::prepare_data_inputs()
(*subchannel)->startAddress = (*(subchannel - 1))->startAddress +
(*(subchannel - 1))->getSizeCu();
}
- if ((*subchannel)->input->open((*subchannel)->inputUri) == -1) {
- perror((*subchannel)->inputUri.c_str());
- throw MuxInitException();
- }
+
+ (*subchannel)->input->open((*subchannel)->inputUri);
// TODO Check errors
int subch_bitrate = (*subchannel)->input->setBitrate( (*subchannel)->bitrate);
@@ -376,12 +373,23 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
// For EDI, save ETI(LI) Management data into a TAG Item DETI
edi::TagDETI edi_tagDETI;
- edi::TagStarPTR edi_tagStarPtr;
+ edi::TagStarPTR edi_tagStarPtr("DETI");
map<DabSubchannel*, edi::TagESTn> edi_subchannelToTag;
// The above Tag Items will be assembled into a TAG Packet
edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
+ const bool tist_enabled = m_pt.get("general.tist", false);
+
+ int tai_utc_offset = 0;
+ if (tist_enabled and m_tai_clock_required) {
+ try {
+ tai_utc_offset = m_clock_tai.get_offset();
+ }
+ catch (const std::runtime_error& e) {
+ etiLog.level(error) << "Could not get UTC-TAI offset for EDI timestamp";
+ }
+ }
update_dab_time();
// Initialise the ETI frame
@@ -583,8 +591,9 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
edi::TagESTn& tag = edi_subchannelToTag[subchannel.get()];
int sizeSubchannel = subchannel->getSizeByte();
- int result = subchannel->input->readFrame(
- &etiFrame[index], sizeSubchannel);
+ // no need to check enableTist because we always increment the timestamp
+ int result = subchannel->readFrame(&etiFrame[index],
+ sizeSubchannel, edi_time + m_tist_offset, tai_utc_offset, timestamp);
if (result < 0) {
etiLog.log(info,
@@ -637,34 +646,25 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
edi_tagDETI.tsta = 0xffffff;
}
- const bool tist_enabled = m_pt.get("general.tist", false);
-
if (tist_enabled and m_tai_clock_required) {
- try {
- const auto tai_utc_offset = m_clock_tai.get_offset();
- edi_tagDETI.set_edi_time(edi_time + m_tist_offset, tai_utc_offset);
- edi_tagDETI.atstf = true;
-
- for (auto output : outputs) {
- shared_ptr<OutputMetadata> md_utco =
- make_shared<OutputMetadataUTCO>(edi_tagDETI.utco);
- output->setMetadata(md_utco);
-
- shared_ptr<OutputMetadata> md_edi_time =
- make_shared<OutputMetadataEDITime>(edi_tagDETI.seconds);
- output->setMetadata(md_edi_time);
-
- shared_ptr<OutputMetadata> md_dlfc =
- make_shared<OutputMetadataDLFC>(currentFrame % 5000);
- output->setMetadata(md_dlfc);
- }
- }
- catch (const std::runtime_error& e) {
- etiLog.level(error) << "Could not get UTC-TAI offset for EDI timestamp";
+ edi_tagDETI.set_edi_time(edi_time + m_tist_offset, tai_utc_offset);
+ edi_tagDETI.atstf = true;
+
+ for (auto output : outputs) {
+ shared_ptr<OutputMetadata> md_utco =
+ make_shared<OutputMetadataUTCO>(edi_tagDETI.utco);
+ output->setMetadata(md_utco);
+
+ shared_ptr<OutputMetadata> md_edi_time =
+ make_shared<OutputMetadataEDITime>(edi_tagDETI.seconds);
+ output->setMetadata(md_edi_time);
+
+ shared_ptr<OutputMetadata> md_dlfc =
+ make_shared<OutputMetadataDLFC>(currentFrame % 5000);
+ output->setMetadata(md_dlfc);
}
}
-
/* Coding of the TIST, according to ETS 300 799 Annex C
Bit number b0(MSb)..b6 b7..b9 b10..b17 b18..b20 b21..b23(LSb)
diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h
index 386c23c..56a8dde 100644
--- a/src/DabMultiplexer.h
+++ b/src/DabMultiplexer.h
@@ -30,15 +30,14 @@
#endif
#include "dabOutput/dabOutput.h"
-#include "dabOutput/edi/TagItems.h"
-#include "dabOutput/edi/TagPacket.h"
-#include "dabOutput/edi/AFPacket.h"
-#include "dabOutput/edi/Transport.h"
+#include "edioutput/TagItems.h"
+#include "edioutput/TagPacket.h"
+#include "edioutput/AFPacket.h"
+#include "edioutput/Transport.h"
#include "fig/FIGCarousel.h"
#include "crc.h"
#include "utils.h"
-#include "UdpSocket.h"
-#include "InetAddress.h"
+#include "Socket.h"
#include "PcDebug.h"
#include "MuxElements.h"
#include "RemoteControl.h"
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 51f0310..de0c362 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -99,8 +99,7 @@ typedef DWORD32 uint32_t;
#include "dabOutput/dabOutput.h"
#include "crc.h"
-#include "UdpSocket.h"
-#include "InetAddress.h"
+#include "Socket.h"
#include "PcDebug.h"
#include "DabMux.h"
#include "MuxElements.h"
@@ -240,6 +239,26 @@ int main(int argc, char *argv[])
etiLog.register_backend(std::make_shared<LogToSyslog>());
}
+ const auto startupcheck = pt.get<string>("general.startupcheck", "");
+ if (not startupcheck.empty()) {
+ etiLog.level(info) << "Running startup check '" << startupcheck << "'";
+ int wstatus = system(startupcheck.c_str());
+
+ if (WIFEXITED(wstatus)) {
+ if (WEXITSTATUS(wstatus) == 0) {
+ etiLog.level(info) << "Startup check ok";
+ }
+ else {
+ etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus);
+ return 1;
+ }
+ }
+ else {
+ etiLog.level(error) << "Startup check failed, child didn't terminate normally";
+ return 1;
+ }
+ }
+
int mgmtserverport = pt.get<int>("general.managementport",
pt.get<int>("general.statsserverport", 0) );
@@ -292,6 +311,8 @@ int main(int argc, char *argv[])
if (outputuid == "edi") {
ptree pt_edi = pt_outputs.get_child("edi");
+ bool require_dest_port = false;
+
for (auto pt_edi_dest : pt_edi.get_child("destinations")) {
const auto proto = pt_edi_dest.second.get<string>("protocol", "udp");
if (proto == "udp") {
@@ -303,9 +324,11 @@ int main(int argc, char *argv[])
dest->source_port = pt_edi_dest.second.get<unsigned int>("sourceport");
edi_conf.destinations.push_back(dest);
+
+ require_dest_port = true;
}
else if (proto == "tcp") {
- auto dest = make_shared<edi::tcp_destination_t>();
+ auto dest = make_shared<edi::tcp_server_t>();
dest->listen_port = pt_edi_dest.second.get<unsigned int>("listenport");
dest->max_frames_queued = pt_edi_dest.second.get<size_t>("max_frames_queued", 500);
edi_conf.destinations.push_back(dest);
@@ -315,11 +338,13 @@ int main(int argc, char *argv[])
}
}
- edi_conf.dest_port = pt_edi.get<unsigned int>("port");
+ if (require_dest_port) {
+ edi_conf.dest_port = pt_edi.get<unsigned int>("port");
+ }
- edi_conf.dump = pt_edi.get<bool>("dump");
- edi_conf.enable_pft = pt_edi.get<bool>("enable_pft");
- edi_conf.verbose = pt_edi.get<bool>("verbose");
+ edi_conf.dump = pt_edi.get<bool>("dump", false);
+ edi_conf.enable_pft = pt_edi.get<bool>("enable_pft", false);
+ edi_conf.verbose = pt_edi.get<bool>("verbose", false);
edi_conf.fec = pt_edi.get<unsigned int>("fec", 3);
edi_conf.chunk_len = pt_edi.get<unsigned int>("chunk_len", 207);
diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp
deleted file mode 100644
index 7660263..0000000
--- a/src/InetAddress.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "InetAddress.h"
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <string.h>
-
-#ifdef TRACE_ON
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(clas, func) cout <<"-" <<(clas) <<"\t(" <<this <<")::" <<(func) <<endl
-# define TRACE_STATIC(clas, func) cout <<"-" <<(clas) <<"\t(static)::" <<(func) <<endl
-# endif
-#else
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(clas, func)
-# define TRACE_STATIC(clas, func)
-# endif
-#endif
-
-
-int inetErrNo = 0;
-const char *inetErrMsg = nullptr;
-const char *inetErrDesc = nullptr;
-
-
-/**
- * Constructs an IP address.
- * @param port The port of this address
- * @param name The name of this address
- */
-InetAddress::InetAddress(int port, const char* name) {
- TRACE_CLASS("InetAddress", "InetAddress(int, char)");
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- addr.sin_port = htons(port);
- if (name)
- setAddress(name);
-}
-
-
-/// Returns the raw IP address of this InetAddress object.
-sockaddr *InetAddress::getAddress() {
- TRACE_CLASS("InetAddress", "getAddress()");
- return (sockaddr *)&addr;
-}
-
-
-/// Return the port of this address.
-int InetAddress::getPort()
-{
- TRACE_CLASS("InetAddress", "getPort()");
- return ntohs(addr.sin_port);
-}
-
-
-/**
- * Returns the IP address string "%d.%d.%d.%d".
- * @return IP address
- */
-const char *InetAddress::getHostAddress() {
- TRACE_CLASS("InetAddress", "getHostAddress()");
- return inet_ntoa(addr.sin_addr);
-}
-
-
-/// Returns true if this address is multicast
-bool InetAddress::isMulticastAddress() {
- TRACE_CLASS("InetAddress", "isMulticastAddress()");
- return IN_MULTICAST(ntohl(addr.sin_addr.s_addr)); // a modifier
-}
-
-
-/**
- * Set the port number
- * @param port The new port number
- */
-void InetAddress::setPort(int port)
-{
- TRACE_CLASS("InetAddress", "setPort(int)");
- addr.sin_port = htons(port);
-}
-
-
-/**
- * Set the address
- * @param name The new address name
- * @return 0 if ok
- * -1 if error
- */
-int InetAddress::setAddress(const std::string& name)
-{
- TRACE_CLASS("InetAddress", "setAddress(string)");
- if (!name.empty()) {
- if (atoi(name.c_str())) { // If it start with a number
- if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) {
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- inetErrNo = 0;
- inetErrMsg = "Invalid address";
- inetErrDesc = name.c_str();
- return -1;
- }
- }
- else { // Assume it's a real name
- hostent *host = gethostbyname(name.c_str());
- if (host) {
- addr.sin_addr = *(in_addr *)(host->h_addr);
- } else {
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- inetErrNo = 0;
- inetErrMsg = "Could not find address";
- inetErrDesc = name.c_str();
- return -1;
- }
- }
- }
- else {
- addr.sin_addr.s_addr = INADDR_ANY;
- }
- return 0;
-}
-
-
-void setInetError(const char* description)
-{
- inetErrNo = 0;
- inetErrNo = errno;
- inetErrMsg = strerror(inetErrNo);
- inetErrDesc = description;
-}
-
diff --git a/src/InetAddress.h b/src/InetAddress.h
deleted file mode 100644
index e246d4c..0000000
--- a/src/InetAddress.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _InetAddress
-#define _InetAddress
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include <stdlib.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#include <string>
-
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define INVALID_PORT -1
-
-
-/// The last error number
-extern int inetErrNo;
-/// The last error message
-extern const char *inetErrMsg;
-/// The description of the last error
-extern const char *inetErrDesc;
-/// Set the number, message and description of the last error
-void setInetError(const char* description);
-
-
-/**
- * This class represents an Internet Protocol (IP) address.
- * @author Pascal Charest pascal.charest@crc.ca
- */
-class InetAddress {
- public:
- InetAddress(int port = 0, const char* name = NULL);
-
- sockaddr *getAddress();
- const char *getHostAddress();
- int getPort();
- int setAddress(const std::string& name);
- void setPort(int port);
- bool isMulticastAddress();
-
- private:
- sockaddr_in addr;
-};
-
-
-#endif
diff --git a/src/Log.cpp b/src/Log.cpp
deleted file mode 100644
index 6b78fe0..0000000
--- a/src/Log.cpp
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
- Her Majesty the Queen in Right of Canada (Communications Research
- Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include <list>
-#include <cstdarg>
-#include <chrono>
-
-#include "Log.h"
-
-using namespace std;
-
-/* etiLog is a singleton used in all parts of ODR-DabMod to output log messages.
- */
-Logger etiLog;
-
-void Logger::register_backend(std::shared_ptr<LogBackend> backend)
-{
- backends.push_back(backend);
-}
-
-
-void Logger::log(log_level_t level, const char* fmt, ...)
-{
- if (level == discard) {
- return;
- }
-
- int size = 100;
- std::string str;
- va_list ap;
- while (1) {
- str.resize(size);
- va_start(ap, fmt);
- int n = vsnprintf((char *)str.c_str(), size, fmt, ap);
- va_end(ap);
- if (n > -1 && n < size) {
- str.resize(n);
- break;
- }
- if (n > -1)
- size = n + 1;
- else
- size *= 2;
- }
-
- logstr(level, move(str));
-}
-
-void Logger::logstr(log_level_t level, std::string&& message)
-{
- if (level == discard) {
- return;
- }
-
- /* Remove a potential trailing newline.
- * It doesn't look good in syslog
- */
- if (message[message.length()-1] == '\n') {
- message.resize(message.length()-1);
- }
-
- for (auto &backend : backends) {
- backend->log(level, message);
- }
-
- {
- std::lock_guard<std::mutex> guard(m_cerr_mutex);
- std::cerr << levels_as_str[level] << " " << message << std::endl;
- }
-}
-
-
-LogLine Logger::level(log_level_t level)
-{
- return LogLine(this, level);
-}
-
-LogToFile::LogToFile(const std::string& filename) : name("FILE")
-{
- FILE* fd = fopen(filename.c_str(), "a");
- if (fd == nullptr) {
- fprintf(stderr, "Cannot open log file !");
- throw std::runtime_error("Cannot open log file !");
- }
-
- log_file.reset(fd);
-}
-
-void LogToFile::log(log_level_t level, const std::string& message)
-{
- if (level != log_level_t::discard) {
- const char* log_level_text[] = {
- "DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"};
-
- // fprintf is thread-safe
- fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n",
- log_level_text[(size_t)level], message.c_str());
- fflush(log_file.get());
- }
-}
-
-void LogToSyslog::log(log_level_t level, const std::string& message)
-{
- if (level != log_level_t::discard) {
- int syslog_level = LOG_EMERG;
- switch (level) {
- case debug: syslog_level = LOG_DEBUG; break;
- case info: syslog_level = LOG_INFO; break;
- /* we don't have the notice level */
- case warn: syslog_level = LOG_WARNING; break;
- case error: syslog_level = LOG_ERR; break;
- default: syslog_level = LOG_CRIT; break;
- case alert: syslog_level = LOG_ALERT; break;
- case emerg: syslog_level = LOG_EMERG; break;
- }
-
- syslog(syslog_level, SYSLOG_IDENT " %s", message.c_str());
- }
-}
diff --git a/src/Log.h b/src/Log.h
deleted file mode 100644
index 18f8c99..0000000
--- a/src/Log.h
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
- Her Majesty the Queen in Right of Canada (Communications Research
- Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include <syslog.h>
-#include <cstdarg>
-#include <cstdio>
-#include <fstream>
-#include <sstream>
-#include <iostream>
-#include <list>
-#include <stdexcept>
-#include <string>
-#include <mutex>
-#include <memory>
-
-#define SYSLOG_IDENT "ODR-DabMux"
-#define SYSLOG_FACILITY LOG_LOCAL0
-
-enum log_level_t {debug = 0, info, warn, error, alert, emerg, discard};
-
-static const std::string levels_as_str[] =
- { " ", " ", "WARN ", "ERROR", "ALERT", "EMERG", "-----"} ;
-
-/** Abstract class all backends must inherit from */
-class LogBackend {
- public:
- virtual ~LogBackend() {};
- virtual void log(log_level_t level, const std::string& message) = 0;
- virtual std::string get_name() const = 0;
-};
-
-/** A Logging backend for Syslog */
-class LogToSyslog : public LogBackend {
- public:
- LogToSyslog() : name("SYSLOG") {
- openlog(SYSLOG_IDENT, LOG_PID, SYSLOG_FACILITY);
- }
-
- virtual ~LogToSyslog() {
- closelog();
- }
-
- void log(log_level_t level, const std::string& message);
-
- std::string get_name() const { return name; }
-
- private:
- const std::string name;
-
- LogToSyslog(const LogToSyslog& other) = delete;
- const LogToSyslog& operator=(const LogToSyslog& other) = delete;
-};
-
-class LogToFile : public LogBackend {
- public:
- LogToFile(const std::string& filename);
- void log(log_level_t level, const std::string& message);
- std::string get_name() const { return name; }
-
- private:
- const std::string name;
-
- struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
- std::unique_ptr<FILE, FILEDeleter> log_file;
-
- LogToFile(const LogToFile& other) = delete;
- const LogToFile& operator=(const LogToFile& other) = delete;
-};
-
-class LogLine;
-
-class Logger {
- public:
- void register_backend(std::shared_ptr<LogBackend> backend);
-
- /* Log the message to all backends */
- void log(log_level_t level, const char* fmt, ...);
-
- void logstr(log_level_t level, std::string&& message);
-
- /* Return a LogLine for the given level
- * so that you can write etiLog.level(info) << "stuff = " << 21 */
- LogLine level(log_level_t level);
-
- private:
- std::list<std::shared_ptr<LogBackend> > backends;
-
- std::mutex m_cerr_mutex;
-};
-
-extern Logger etiLog;
-
-// Accumulate a line of logs, using same syntax as stringstream
-// The line is logged when the LogLine gets destroyed
-class LogLine {
- public:
- LogLine(const LogLine& logline);
- const LogLine& operator=(const LogLine& other) = delete;
- LogLine(Logger* logger, log_level_t level) :
- logger_(logger)
- {
- level_ = level;
- }
-
- // Push the new element into the stringstream
- template <typename T>
- LogLine& operator<<(T s) {
- if (level_ != discard) {
- os << s;
- }
- return *this;
- }
-
- ~LogLine()
- {
- if (level_ != discard) {
- logger_->logstr(level_, os.str());
- }
- }
-
- private:
- std::ostringstream os;
- log_level_t level_;
- Logger* logger_;
-};
-
-
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp
index 201fc7b..783a40b 100644
--- a/src/ManagementServer.cpp
+++ b/src/ManagementServer.cpp
@@ -485,6 +485,14 @@ void InputStat::notifyOverrun(void)
}
}
+void InputStat::notifyVersion(const std::string& version, uint32_t uptime_s)
+{
+ unique_lock<mutex> lock(m_mutex);
+
+ m_version = version;
+ m_uptime_s = uptime_s;
+}
+
std::string InputStat::encodeValuesJSON()
{
std::ostringstream ss;
@@ -548,6 +556,13 @@ std::string InputStat::encodeValuesJSON()
return dB;
};
+ auto version = m_version;
+ size_t pos = 0;
+ while ((pos = version.find("\"", pos)) != std::string::npos) {
+ version.replace(pos, 1, "\\\"");
+ pos++;
+ }
+
ss <<
"{ \"inputstat\" : {"
"\"min_fill\": " << min_fill_buffer << ", "
@@ -557,7 +572,10 @@ std::string InputStat::encodeValuesJSON()
"\"peak_left_slow\": " << to_dB(peak_left) << ", "
"\"peak_right_slow\": " << to_dB(peak_right) << ", "
"\"num_underruns\": " << m_num_underruns << ", "
- "\"num_overruns\": " << m_num_overruns << ", ";
+ "\"num_overruns\": " << m_num_overruns << ", "
+ "\"version\": \"" << version << "\", "
+ "\"uptime\": " << m_uptime_s << ", "
+ ;
ss << "\"state\": ";
diff --git a/src/ManagementServer.h b/src/ManagementServer.h
index 18af48c..5b52957 100644
--- a/src/ManagementServer.h
+++ b/src/ManagementServer.h
@@ -100,6 +100,7 @@ class InputStat
void notifyPeakLevels(int peak_left, int peak_right);
void notifyUnderrun(void);
void notifyOverrun(void);
+ void notifyVersion(const std::string& version, uint32_t uptime_s);
std::string encodeValuesJSON(void);
input_state_t determineState(void);
@@ -131,6 +132,9 @@ class InputStat
size_t m_short_window_length = 0;
+ std::string m_version;
+ uint32_t m_uptime_s = 0;
+
/************* STATE ***************/
/* Variables used for determining the input state */
int m_glitch_counter = 0; // saturating counter
diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp
index ad1fcd4..81466a8 100644
--- a/src/MuxElements.cpp
+++ b/src/MuxElements.cpp
@@ -784,6 +784,17 @@ unsigned short DabSubchannel::getSizeDWord() const
return (bitrate * 3) >> 3;
}
+size_t DabSubchannel::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ switch (input->getBufferManagement()) {
+ case Inputs::BufferManagement::Prebuffering:
+ return input->readFrame(buffer, size);
+ case Inputs::BufferManagement::Timestamped:
+ return input->readFrame(buffer, size, seconds, utco, tsta);
+ }
+ throw logic_error("Unhandled case");
+}
+
LinkageSet::LinkageSet(const std::string& name,
uint16_t lsn,
bool active,
diff --git a/src/MuxElements.h b/src/MuxElements.h
index ec79fdd..0f7e621 100644
--- a/src/MuxElements.h
+++ b/src/MuxElements.h
@@ -356,6 +356,9 @@ public:
// Calculate subchannel size in number of uint64_t
unsigned short getSizeDWord(void) const;
+ // Read from the input, using the correct buffer management
+ size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
+
std::string uid;
std::string inputUri;
diff --git a/src/ReedSolomon.cpp b/src/ReedSolomon.cpp
deleted file mode 100644
index 38d8ea8..0000000
--- a/src/ReedSolomon.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right
- of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "ReedSolomon.h"
-#include <vector>
-#include <algorithm>
-#include <stdexcept>
-#include <sstream>
-#include <stdio.h> // For galois.h ...
-#include <string.h> // For memcpy
-
-extern "C" {
-#include "fec/fec.h"
-}
-#include <assert.h>
-
-#define SYMSIZE 8
-
-
-ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem)
-{
- setReverse(reverse);
-
- m_N = N;
- m_K = K;
-
- const int symsize = SYMSIZE;
- const int nroots = N - K; // For EDI PFT, this must be 48
- const int pad = ((1 << symsize) - 1) - N; // is 255-N
-
- rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad);
-
- if (rsData == nullptr) {
- std::stringstream ss;
- ss << "Invalid Reed-Solomon parameters! " <<
- "N=" << N << " ; K=" << K << " ; pad=" << pad;
- throw std::invalid_argument(ss.str());
- }
-}
-
-
-ReedSolomon::~ReedSolomon()
-{
- free_rs_char(rsData);
-}
-
-
-void ReedSolomon::setReverse(bool state)
-{
- reverse = state;
-}
-
-
-int ReedSolomon::encode(void* data, void* fec, size_t size)
-{
- uint8_t* input = reinterpret_cast<uint8_t*>(data);
- uint8_t* output = reinterpret_cast<uint8_t*>(fec);
- int ret = 0;
-
- if (reverse) {
- std::vector<uint8_t> buffer(m_N);
-
- memcpy(&buffer[0], input, m_K);
- memcpy(&buffer[m_K], output, m_N - m_K);
-
- ret = decode_rs_char(rsData, &buffer[0], nullptr, 0);
- if ((ret != 0) && (ret != -1)) {
- memcpy(input, &buffer[0], m_K);
- memcpy(output, &buffer[m_K], m_N - m_K);
- }
- }
- else {
- encode_rs_char(rsData, input, output);
- }
-
- return ret;
-}
-
-
-int ReedSolomon::encode(void* data, size_t size)
-{
- uint8_t* input = reinterpret_cast<uint8_t*>(data);
- int ret = 0;
-
- if (reverse) {
- ret = decode_rs_char(rsData, input, nullptr, 0);
- }
- else {
- encode_rs_char(rsData, input, &input[m_K]);
- }
-
- return ret;
-}
diff --git a/src/ReedSolomon.h b/src/ReedSolomon.h
deleted file mode 100644
index abcef62..0000000
--- a/src/ReedSolomon.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right
- of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include <config.h>
-#endif
-
-#include <stdlib.h>
-
-class ReedSolomon
-{
-public:
- ReedSolomon(int N, int K,
- bool reverse = false,
- int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1);
- ReedSolomon(const ReedSolomon& other) = delete;
- ReedSolomon operator=(const ReedSolomon& other) = delete;
- ~ReedSolomon();
-
- void setReverse(bool state);
- int encode(void* data, void* fec, size_t size);
- int encode(void* data, size_t size);
-
-private:
- int m_N;
- int m_K;
-
- void* rsData;
- bool reverse;
-};
-
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
deleted file mode 100644
index b32c21a..0000000
--- a/src/RemoteControl.cpp
+++ /dev/null
@@ -1,595 +0,0 @@
-/*
- Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
- Her Majesty the Queen in Right of Canada (Communications Research
- Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-#include <list>
-#include <string>
-#include <iostream>
-#include <string>
-#include <boost/asio.hpp>
-#include <boost/thread.hpp>
-
-#include "RemoteControl.h"
-
-using boost::asio::ip::tcp;
-using namespace std;
-
-RemoteControllers rcs;
-
-RemoteControllerTelnet::~RemoteControllerTelnet()
-{
- m_active = false;
- m_io_service.stop();
-
- if (m_restarter_thread.joinable()) {
- m_restarter_thread.join();
- }
-
- if (m_child_thread.joinable()) {
- m_child_thread.join();
- }
-}
-
-void RemoteControllerTelnet::restart()
-{
- if (m_restarter_thread.joinable()) {
- m_restarter_thread.join();
- }
-
- m_restarter_thread = std::thread(
- &RemoteControllerTelnet::restart_thread,
- this, 0);
-}
-
-RemoteControllable::~RemoteControllable() {
- rcs.remove_controllable(this);
-}
-
-std::list<std::string> RemoteControllable::get_supported_parameters() const {
- std::list<std::string> parameterlist;
- for (const auto& param : m_parameters) {
- parameterlist.push_back(param[0]);
- }
- return parameterlist;
-}
-
-void RemoteControllers::add_controller(std::shared_ptr<BaseRemoteController> rc) {
- m_controllers.push_back(rc);
-}
-
-void RemoteControllers::enrol(RemoteControllable *rc) {
- controllables.push_back(rc);
-}
-
-void RemoteControllers::remove_controllable(RemoteControllable *rc) {
- controllables.remove(rc);
-}
-
-std::list< std::vector<std::string> > RemoteControllers::get_param_list_values(const std::string& name) {
- RemoteControllable* controllable = get_controllable_(name);
-
- std::list< std::vector<std::string> > allparams;
- for (auto &param : controllable->get_supported_parameters()) {
- std::vector<std::string> item;
- item.push_back(param);
- try {
- item.push_back(controllable->get_parameter(param));
- }
- catch (const ParameterError &e) {
- item.push_back(std::string("error: ") + e.what());
- }
-
- allparams.push_back(item);
- }
- return allparams;
-}
-
-std::string RemoteControllers::get_param(const std::string& name, const std::string& param) {
- RemoteControllable* controllable = get_controllable_(name);
- return controllable->get_parameter(param);
-}
-
-void RemoteControllers::check_faults() {
- for (auto &controller : m_controllers) {
- if (controller->fault_detected()) {
- etiLog.level(warn) <<
- "Detected Remote Control fault, restarting it";
- controller->restart();
- }
- }
-}
-
-RemoteControllable* RemoteControllers::get_controllable_(const std::string& name)
-{
- auto rc = std::find_if(controllables.begin(), controllables.end(),
- [&](RemoteControllable* r) { return r->get_rc_name() == name; });
-
- if (rc == controllables.end()) {
- throw ParameterError("Module name unknown");
- }
- else {
- return *rc;
- }
-}
-
-void RemoteControllers::set_param(
- const std::string& name,
- const std::string& param,
- const std::string& value)
-{
- etiLog.level(info) << "RC: Setting " << name << " " << param
- << " to " << value;
- RemoteControllable* controllable = get_controllable_(name);
- try {
- return controllable->set_parameter(param, value);
- }
- catch (const ios_base::failure& e) {
- etiLog.level(info) << "RC: Failed to set " << name << " " << param
- << " to " << value << ": " << e.what();
- throw ParameterError("Cannot understand value");
- }
-}
-
-// This runs in a separate thread, because
-// it would take too long to be done in the main loop
-// thread.
-void RemoteControllerTelnet::restart_thread(long)
-{
- m_active = false;
- m_io_service.stop();
-
- if (m_child_thread.joinable()) {
- m_child_thread.join();
- }
-
- m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0);
-}
-
-void RemoteControllerTelnet::handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor)
-{
-
- const std::string welcome = "ODR-DabMux Remote Control CLI\n"
- "Write 'help' for help.\n"
- "**********\n";
- const std::string prompt = "> ";
-
- std::string in_message;
- size_t length;
-
- if (boost_error) {
- etiLog.level(error) << "RC: Error accepting connection";
- return;
- }
-
- try {
- etiLog.level(info) << "RC: Accepted";
-
- boost::system::error_code ignored_error;
-
- boost::asio::write(*socket, boost::asio::buffer(welcome),
- boost::asio::transfer_all(),
- ignored_error);
-
- while (m_active && in_message != "quit") {
- boost::asio::write(*socket, boost::asio::buffer(prompt),
- boost::asio::transfer_all(),
- ignored_error);
-
- in_message = "";
-
- boost::asio::streambuf buffer;
- length = boost::asio::read_until(*socket, buffer, "\n", ignored_error);
-
- std::istream str(&buffer);
- std::getline(str, in_message);
-
- if (length == 0) {
- etiLog.level(info) << "RC: Connection terminated";
- break;
- }
-
- while (in_message.length() > 0 &&
- (in_message[in_message.length()-1] == '\r' ||
- in_message[in_message.length()-1] == '\n')) {
- in_message.erase(in_message.length()-1, 1);
- }
-
- if (in_message.length() == 0) {
- continue;
- }
-
- etiLog.level(info) << "RC: Got message '" << in_message << "'";
-
- dispatch_command(*socket, in_message);
- }
- etiLog.level(info) << "RC: Closing socket";
- socket->close();
- }
- catch (const std::exception& e)
- {
- etiLog.level(error) << "Remote control caught exception: " << e.what();
- }
-}
-
-void RemoteControllerTelnet::process(long)
-{
- m_active = true;
-
- while (m_active) {
- m_io_service.reset();
-
- tcp::acceptor acceptor(m_io_service, tcp::endpoint(
- boost::asio::ip::address::from_string("127.0.0.1"), m_port) );
-
-
- // Add a job to start accepting connections.
- boost::shared_ptr<tcp::socket> socket(
- new tcp::socket(acceptor.get_io_service()));
-
- // Add an accept call to the service. This will prevent io_service::run()
- // from returning.
- etiLog.level(info) << "RC: Waiting for connection on port " << m_port;
- acceptor.async_accept(*socket,
- boost::bind(&RemoteControllerTelnet::handle_accept,
- this,
- boost::asio::placeholders::error,
- socket,
- boost::ref(acceptor)));
-
- // Process event loop.
- m_io_service.run();
- }
-
- etiLog.level(info) << "RC: Leaving";
- m_fault = true;
-}
-
-static std::vector<std::string> tokenise(const std::string& message) {
- stringstream ss(message);
- std::vector<std::string> all_tokens;
- std::string item;
-
- while (std::getline(ss, item, ' ')) {
- all_tokens.push_back(move(item));
- }
- return all_tokens;
-}
-
-
-void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command)
-{
- vector<string> cmd = tokenise(command);
-
- if (cmd[0] == "help") {
- reply(socket,
- "The following commands are supported:\n"
- " list\n"
- " * Lists the modules that are loaded and their parameters\n"
- " show MODULE\n"
- " * Lists all parameters and their values from module MODULE\n"
- " get MODULE PARAMETER\n"
- " * Gets the value for the specified PARAMETER from module MODULE\n"
- " set MODULE PARAMETER VALUE\n"
- " * Sets the value for the PARAMETER ofr module MODULE\n"
- " quit\n"
- " * Terminate this session\n"
- "\n");
- }
- else if (cmd[0] == "list") {
- stringstream ss;
-
- if (cmd.size() == 1) {
- for (auto &controllable : rcs.controllables) {
- ss << controllable->get_rc_name() << endl;
-
- list< vector<string> > params = controllable->get_parameter_descriptions();
- for (auto &param : params) {
- ss << "\t" << param[0] << " : " << param[1] << endl;
- }
- }
- }
- else {
- reply(socket, "Too many arguments for command 'list'");
- }
-
- reply(socket, ss.str());
- }
- else if (cmd[0] == "show") {
- if (cmd.size() == 2) {
- try {
- stringstream ss;
- list< vector<string> > r = rcs.get_param_list_values(cmd[1]);
- for (auto &param_val : r) {
- ss << param_val[0] << ": " << param_val[1] << endl;
- }
- reply(socket, ss.str());
-
- }
- catch (const ParameterError &e) {
- reply(socket, e.what());
- }
- }
- else {
- reply(socket, "Incorrect parameters for command 'show'");
- }
- }
- else if (cmd[0] == "get") {
- if (cmd.size() == 3) {
- try {
- string r = rcs.get_param(cmd[1], cmd[2]);
- reply(socket, r);
- }
- catch (const ParameterError &e) {
- reply(socket, e.what());
- }
- }
- else {
- reply(socket, "Incorrect parameters for command 'get'");
- }
- }
- else if (cmd[0] == "set") {
- if (cmd.size() >= 4) {
- try {
- stringstream new_param_value;
- for (size_t i = 3; i < cmd.size(); i++) {
- new_param_value << cmd[i];
-
- if (i+1 < cmd.size()) {
- new_param_value << " ";
- }
- }
-
- rcs.set_param(cmd[1], cmd[2], new_param_value.str());
- reply(socket, "ok");
- }
- catch (const ParameterError &e) {
- reply(socket, e.what());
- }
- catch (const exception &e) {
- reply(socket, "Error: Invalid parameter value. ");
- }
- }
- else {
- reply(socket, "Incorrect parameters for command 'set'");
- }
- }
- else if (cmd[0] == "quit") {
- reply(socket, "Goodbye");
- }
- else {
- reply(socket, "Message not understood");
- }
-}
-
-void RemoteControllerTelnet::reply(tcp::socket& socket, string message)
-{
- boost::system::error_code ignored_error;
- stringstream ss;
- ss << message << "\r\n";
- boost::asio::write(socket, boost::asio::buffer(ss.str()),
- boost::asio::transfer_all(),
- ignored_error);
-}
-
-
-#if defined(HAVE_RC_ZEROMQ)
-
-RemoteControllerZmq::~RemoteControllerZmq() {
- m_active = false;
- m_fault = false;
-
- if (m_restarter_thread.joinable()) {
- m_restarter_thread.join();
- }
-
- if (m_child_thread.joinable()) {
- m_child_thread.join();
- }
-}
-
-void RemoteControllerZmq::restart()
-{
- if (m_restarter_thread.joinable()) {
- m_restarter_thread.join();
- }
-
- m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this);
-}
-
-// This runs in a separate thread, because
-// it would take too long to be done in the main loop
-// thread.
-void RemoteControllerZmq::restart_thread()
-{
- m_active = false;
-
- if (m_child_thread.joinable()) {
- m_child_thread.join();
- }
-
- m_child_thread = std::thread(&RemoteControllerZmq::process, this);
-}
-
-void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message)
-{
- bool more = true;
- do {
- zmq::message_t msg;
- pSocket.recv(&msg);
- std::string incoming((char*)msg.data(), msg.size());
- message.push_back(incoming);
- more = msg.more();
- } while (more);
-}
-
-void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket)
-{
- zmq::message_t msg(2);
- char repCode[2] = {'o', 'k'};
- memcpy ((void*) msg.data(), repCode, 2);
- pSocket.send(msg, 0);
-}
-
-void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error)
-{
- zmq::message_t msg1(4);
- char repCode[4] = {'f', 'a', 'i', 'l'};
- memcpy ((void*) msg1.data(), repCode, 4);
- pSocket.send(msg1, ZMQ_SNDMORE);
-
- zmq::message_t msg2(error.length());
- memcpy ((void*) msg2.data(), error.c_str(), error.length());
- pSocket.send(msg2, 0);
-}
-
-void RemoteControllerZmq::process()
-{
- m_fault = false;
-
- // create zmq reply socket for receiving ctrl parameters
- try {
- zmq::socket_t repSocket(m_zmqContext, ZMQ_REP);
-
- // connect the socket
- int hwm = 100;
- int linger = 0;
- repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
- repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
- repSocket.bind(m_endpoint.c_str());
-
- // create pollitem that polls the ZMQ sockets
- zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} };
- while (m_active) {
- zmq::poll(pollItems, 1, 100);
- std::vector<std::string> msg;
-
- if (pollItems[0].revents & ZMQ_POLLIN) {
- recv_all(repSocket, msg);
-
- std::string command((char*)msg[0].data(), msg[0].size());
-
- if (msg.size() == 1 && command == "ping") {
- send_ok_reply(repSocket);
- }
- else if (msg.size() == 1 && command == "list") {
- size_t cohort_size = rcs.controllables.size();
- for (auto &controllable : rcs.controllables) {
- std::stringstream ss;
- ss << "{ \"name\": \"" << controllable->get_rc_name() << "\"," <<
- " \"params\": { ";
-
- list< vector<string> > params = controllable->get_parameter_descriptions();
- size_t i = 0;
- for (auto &param : params) {
- if (i > 0) {
- ss << ", ";
- }
-
- ss << "\"" << param[0] << "\": " <<
- "\"" << param[1] << "\"";
-
- i++;
- }
-
- ss << " } }";
-
- std::string msg_s = ss.str();
-
- zmq::message_t zmsg(ss.str().size());
- memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size());
-
- int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0;
- repSocket.send(zmsg, flag);
- }
- }
- else if (msg.size() == 2 && command == "show") {
- std::string module((char*) msg[1].data(), msg[1].size());
- try {
- list< vector<string> > r = rcs.get_param_list_values(module);
- size_t r_size = r.size();
- for (auto &param_val : r) {
- std::stringstream ss;
- ss << param_val[0] << ": " << param_val[1] << endl;
- zmq::message_t zmsg(ss.str().size());
- memcpy(zmsg.data(), ss.str().data(), ss.str().size());
-
- int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0;
- repSocket.send(zmsg, flag);
- }
- }
- catch (const ParameterError &err) {
- send_fail_reply(repSocket, err.what());
- }
- }
- else if (msg.size() == 3 && command == "get") {
- std::string module((char*) msg[1].data(), msg[1].size());
- std::string parameter((char*) msg[2].data(), msg[2].size());
-
- try {
- std::string value = rcs.get_param(module, parameter);
- zmq::message_t zmsg(value.size());
- memcpy ((void*) zmsg.data(), value.data(), value.size());
- repSocket.send(zmsg, 0);
- }
- catch (const ParameterError &err) {
- send_fail_reply(repSocket, err.what());
- }
- }
- else if (msg.size() == 4 && command == "set") {
- std::string module((char*) msg[1].data(), msg[1].size());
- std::string parameter((char*) msg[2].data(), msg[2].size());
- std::string value((char*) msg[3].data(), msg[3].size());
-
- try {
- rcs.set_param(module, parameter, value);
- send_ok_reply(repSocket);
- }
- catch (const ParameterError &err) {
- send_fail_reply(repSocket, err.what());
- }
- }
- else {
- send_fail_reply(repSocket,
- "Unsupported command. commands: list, show, get, set");
- }
- }
- }
- repSocket.close();
- }
- catch (const zmq::error_t &e) {
- etiLog.level(error) << "ZMQ RC error: " << std::string(e.what());
- }
- catch (const std::exception& e) {
- etiLog.level(error) << "ZMQ RC caught exception: " << e.what();
- m_fault = true;
- }
-}
-
-#endif
-
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
deleted file mode 100644
index 0726b28..0000000
--- a/src/RemoteControl.h
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
- Her Majesty the Queen in Right of Canada (Communications Research
- Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- This module adds remote-control capability to some of the dabmux modules.
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#if defined(HAVE_RC_ZEROMQ)
-# include "zmq.hpp"
-#endif
-
-#include <list>
-#include <map>
-#include <memory>
-#include <string>
-#include <atomic>
-#include <iostream>
-#include <boost/bind.hpp>
-#include <boost/asio.hpp>
-#include <boost/foreach.hpp>
-#include <boost/tokenizer.hpp>
-#include <thread>
-#include <stdexcept>
-
-#include "Log.h"
-
-#define RC_ADD_PARAMETER(p, desc) { \
- std::vector<std::string> p; \
- p.push_back(#p); \
- p.push_back(desc); \
- m_parameters.push_back(p); \
-}
-
-class ParameterError : public std::exception
-{
- public:
- ParameterError(std::string message) : m_message(message) {}
- ~ParameterError() throw() {}
- const char* what() const throw() { return m_message.c_str(); }
-
- private:
- std::string m_message;
-};
-
-class RemoteControllable;
-
-/* Remote controllers (that recieve orders from the user)
- * must implement BaseRemoteController
- */
-class BaseRemoteController {
- public:
- /* When this returns one, the remote controller cannot be
- * used anymore, and must be restarted by dabmux
- */
- virtual bool fault_detected() = 0;
-
- /* In case of a fault, the remote controller can be
- * restarted.
- */
- virtual void restart() = 0;
-
- virtual ~BaseRemoteController() {}
-};
-
-/* Objects that support remote control must implement the following class */
-class RemoteControllable {
- public:
- RemoteControllable(const std::string& name) :
- m_rc_name(name) {}
-
- RemoteControllable(const RemoteControllable& other) = delete;
- RemoteControllable& operator=(const RemoteControllable& other) = delete;
-
- virtual ~RemoteControllable();
-
- /* return a short name used to identify the controllable.
- * It might be used in the commands the user has to type, so keep
- * it short
- */
- virtual std::string get_rc_name() const { return m_rc_name; }
-
- /* Return a list of possible parameters that can be set */
- virtual std::list<std::string> get_supported_parameters() const;
-
- /* Return a mapping of the descriptions of all parameters */
- virtual std::list< std::vector<std::string> >
- get_parameter_descriptions() const
- {
- return m_parameters;
- }
-
- /* Base function to set parameters. */
- virtual void set_parameter(
- const std::string& parameter,
- const std::string& value) = 0;
-
- /* Getting a parameter always returns a string. */
- virtual const std::string get_parameter(const std::string& parameter) const = 0;
-
- protected:
- std::string m_rc_name;
- std::list< std::vector<std::string> > m_parameters;
-};
-
-/* Holds all our remote controllers and controlled object.
- */
-class RemoteControllers {
- public:
- void add_controller(std::shared_ptr<BaseRemoteController> rc);
- void enrol(RemoteControllable *rc);
- void remove_controllable(RemoteControllable *rc);
- void check_faults();
- std::list< std::vector<std::string> > get_param_list_values(const std::string& name);
- std::string get_param(const std::string& name, const std::string& param);
-
- void set_param(
- const std::string& name,
- const std::string& param,
- const std::string& value);
-
- std::list<RemoteControllable*> controllables;
-
- private:
- RemoteControllable* get_controllable_(const std::string& name);
-
- std::list<std::shared_ptr<BaseRemoteController> > m_controllers;
-};
-
-extern RemoteControllers rcs;
-
-/* Implements a Remote controller based on a simple telnet CLI
- * that listens on localhost
- */
-class RemoteControllerTelnet : public BaseRemoteController {
- public:
- RemoteControllerTelnet()
- : m_active(false),
- m_io_service(),
- m_fault(false),
- m_port(0) { }
-
- RemoteControllerTelnet(int port)
- : m_active(port > 0),
- m_io_service(),
- m_fault(false),
- m_port(port)
- {
- restart();
- }
-
-
- RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete;
- RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete;
-
- ~RemoteControllerTelnet();
-
- virtual bool fault_detected() { return m_fault; }
-
- virtual void restart();
-
- private:
- void restart_thread(long);
-
- void process(long);
-
- void dispatch_command(boost::asio::ip::tcp::socket& socket,
- std::string command);
-
- void reply(boost::asio::ip::tcp::socket& socket, std::string message);
-
- void handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor);
-
- std::atomic<bool> m_active;
-
- boost::asio::io_service m_io_service;
-
- /* This is set to true if a fault occurred */
- std::atomic<bool> m_fault;
- std::thread m_restarter_thread;
-
- std::thread m_child_thread;
-
- int m_port;
-};
-
-#if defined(HAVE_RC_ZEROMQ)
-/* Implements a Remote controller using zmq transportlayer
- * that listens on localhost
- */
-class RemoteControllerZmq : public BaseRemoteController {
- public:
- RemoteControllerZmq()
- : m_active(false), m_fault(false),
- m_zmqContext(1),
- m_endpoint("") { }
-
- RemoteControllerZmq(const std::string& endpoint)
- : m_active(not endpoint.empty()), m_fault(false),
- m_zmqContext(1),
- m_endpoint(endpoint),
- m_child_thread(&RemoteControllerZmq::process, this) { }
-
- RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete;
- RemoteControllerZmq(const RemoteControllerZmq& other) = delete;
-
- ~RemoteControllerZmq();
-
- virtual bool fault_detected() { return m_fault; }
-
- virtual void restart();
-
- private:
- void restart_thread();
-
- void recv_all(zmq::socket_t &pSocket, std::vector<std::string> &message);
- void send_ok_reply(zmq::socket_t &pSocket);
- void send_fail_reply(zmq::socket_t &pSocket, const std::string &error);
- void process();
-
- std::atomic<bool> m_active;
-
- /* This is set to true if a fault occurred */
- std::atomic<bool> m_fault;
- std::thread m_restarter_thread;
-
- zmq::context_t m_zmqContext;
-
- std::string m_endpoint;
- std::thread m_child_thread;
-};
-#endif
-
diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp
deleted file mode 100644
index 3ebe73c..0000000
--- a/src/TcpSocket.cpp
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "TcpSocket.h"
-#include "Log.h"
-#include <iostream>
-#include <cstdio>
-#include <cstring>
-#include <cstdint>
-#include <signal.h>
-#include <errno.h>
-#include <poll.h>
-#include <thread>
-
-using namespace std;
-
-using vec_u8 = std::vector<uint8_t>;
-
-
-TcpSocket::TcpSocket() :
- m_sock(INVALID_SOCKET)
-{
-}
-
-TcpSocket::TcpSocket(int port, const string& name) :
- m_sock(INVALID_SOCKET)
-{
- if (port) {
- if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
- throw std::runtime_error("Can't create socket");
- }
-
- reuseopt_t reuse = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- == SOCKET_ERROR) {
- throw std::runtime_error("Can't reuse address");
- }
-
-#if defined(HAVE_SO_NOSIGPIPE)
- int val = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))
- == SOCKET_ERROR) {
- throw std::runtime_error("Can't set SO_NOSIGPIPE");
- }
-#endif
-
- m_own_address.setAddress(name);
- m_own_address.setPort(port);
-
- if (::bind(m_sock, m_own_address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) {
- ::close(m_sock);
- m_sock = INVALID_SOCKET;
- throw std::runtime_error("Can't bind socket");
- }
- }
-}
-
-TcpSocket::TcpSocket(SOCKET sock, InetAddress own, InetAddress remote) :
- m_own_address(own),
- m_remote_address(remote),
- m_sock(sock) { }
-
-// The move constructors must ensure the moved-from
-// TcpSocket won't destroy our socket handle
-TcpSocket::TcpSocket(TcpSocket&& other)
-{
- m_sock = other.m_sock;
- other.m_sock = INVALID_SOCKET;
-
- m_own_address = other.m_own_address;
- m_remote_address = other.m_remote_address;
-}
-
-TcpSocket& TcpSocket::operator=(TcpSocket&& other)
-{
- m_sock = other.m_sock;
- other.m_sock = INVALID_SOCKET;
-
- m_own_address = other.m_own_address;
- m_remote_address = other.m_remote_address;
- return *this;
-}
-
-/**
- * Close the underlying socket.
- * @return 0 if ok
- * -1 if error
- */
-int TcpSocket::close()
-{
- if (m_sock != INVALID_SOCKET) {
- int res = ::close(m_sock);
- if (res != 0) {
- setInetError("Can't close socket");
- return -1;
- }
- m_sock = INVALID_SOCKET;
- }
- return 0;
-}
-
-TcpSocket::~TcpSocket()
-{
- close();
-}
-
-bool TcpSocket::isValid()
-{
- return m_sock != INVALID_SOCKET;
-}
-
-ssize_t TcpSocket::recv(void* data, size_t size)
-{
- ssize_t ret = ::recv(m_sock, (char*)data, size, 0);
- if (ret == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket recv error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- return ret;
-}
-
-
-ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms)
-{
- if (timeout_ms) {
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLOUT;
-
- const int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1) {
- stringstream ss;
- ss << "TCP Socket send error on poll(): " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- else if (retval == 0) {
- // Timed out
- return 0;
- }
- }
-
- /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not
- * receive a SIGPIPE and die.
- * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */
-#if defined(HAVE_MSG_NOSIGNAL)
- const int flags = MSG_NOSIGNAL;
-#else
- const int flags = 0;
-#endif
- const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);
-
- if (ret == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket send error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- return ret;
-}
-
-void TcpSocket::listen()
-{
- if (::listen(m_sock, 1) == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket listen error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
-}
-
-TcpSocket TcpSocket::accept()
-{
- InetAddress remote_addr;
- socklen_t addrLen = sizeof(sockaddr_in);
-
- SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen);
- if (socket == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket accept error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- else {
- TcpSocket client(socket, m_own_address, remote_addr);
- return client;
- }
-}
-
-TcpSocket TcpSocket::accept(int timeout_ms)
-{
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLIN | POLLOUT;
-
- int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1) {
- stringstream ss;
- ss << "TCP Socket accept error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- else if (retval) {
- return accept();
- }
- else {
- TcpSocket invalidsock(0, "");
- return invalidsock;
- }
-}
-
-
-InetAddress TcpSocket::getOwnAddress() const
-{
- return m_own_address;
-}
-
-InetAddress TcpSocket::getRemoteAddress() const
-{
- return m_remote_address;
-}
-
-
-TCPConnection::TCPConnection(TcpSocket&& sock) :
- queue(),
- m_running(true),
- m_sender_thread(),
- m_sock(move(sock))
-{
- auto own_addr = m_sock.getOwnAddress();
- auto addr = m_sock.getRemoteAddress();
- etiLog.level(debug) << "New TCP Connection on port " <<
- own_addr.getPort() << " from " <<
- addr.getHostAddress() << ":" << addr.getPort();
- m_sender_thread = std::thread(&TCPConnection::process, this);
-}
-
-TCPConnection::~TCPConnection()
-{
- m_running = false;
- vec_u8 termination_marker;
- queue.push(termination_marker);
- m_sender_thread.join();
-}
-
-void TCPConnection::process()
-{
- while (m_running) {
- vec_u8 data;
- queue.wait_and_pop(data);
-
- if (data.empty()) {
- // empty vector is the termination marker
- m_running = false;
- break;
- }
-
- try {
- ssize_t remaining = data.size();
- const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data());
- const int timeout_ms = 10; // Less than one ETI frame
-
- while (m_running and remaining > 0) {
- const ssize_t sent = m_sock.send(buf, remaining, timeout_ms);
- if (sent < 0 or sent > remaining) {
- throw std::logic_error("Invalid TcpSocket::send() return value");
- }
- remaining -= sent;
- buf += sent;
- }
- }
- catch (const std::runtime_error& e) {
- m_running = false;
- }
- }
-
-
- auto own_addr = m_sock.getOwnAddress();
- auto addr = m_sock.getRemoteAddress();
- etiLog.level(debug) << "Dropping TCP Connection on port " <<
- own_addr.getPort() << " from " <<
- addr.getHostAddress() << ":" << addr.getPort();
-}
-
-
-TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
- m_max_queue_size(max_queue_size)
-{
-}
-
-TCPDataDispatcher::~TCPDataDispatcher()
-{
- m_running = false;
- m_connections.clear();
- m_listener_socket.close();
- m_listener_thread.join();
-}
-
-void TCPDataDispatcher::start(int port, const string& address)
-{
- TcpSocket sock(port, address);
- m_listener_socket = move(sock);
-
- m_running = true;
- m_listener_thread = std::thread(&TCPDataDispatcher::process, this);
-}
-
-void TCPDataDispatcher::write(const vec_u8& data)
-{
- for (auto& connection : m_connections) {
- connection.queue.push(data);
- }
-
- m_connections.remove_if(
- [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
-}
-
-void TCPDataDispatcher::process()
-{
- try {
- m_listener_socket.listen();
-
- const int timeout_ms = 1000;
-
- while (m_running) {
- // Add a new TCPConnection to the list, constructing it from the client socket
- auto sock = m_listener_socket.accept(timeout_ms);
- if (sock.isValid()) {
- m_connections.emplace(m_connections.begin(), move(sock));
- }
- }
- }
- catch (const std::runtime_error& e) {
- etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what();
- m_running = false;
- }
-}
-
diff --git a/src/TcpSocket.h b/src/TcpSocket.h
deleted file mode 100644
index ec7afd3..0000000
--- a/src/TcpSocket.h
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _TCPSOCKET
-#define _TCPSOCKET
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "InetAddress.h"
-#include "ThreadsafeQueue.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-#define reuseopt_t int
-
-#include <iostream>
-#include <string>
-#include <vector>
-#include <memory>
-#include <atomic>
-#include <thread>
-#include <list>
-
-/**
- * This class represents a TCP socket.
- */
-class TcpSocket
-{
- public:
- /** Create a new socket that does nothing */
- TcpSocket();
-
- /** Create a new socket listening for incoming connections.
- * @param port The port number on which the socket will listen.
- * @param name The IP address on which the socket will be bound.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- */
- TcpSocket(int port, const std::string& name);
- ~TcpSocket();
- TcpSocket(TcpSocket&& other);
- TcpSocket& operator=(TcpSocket&& other);
- TcpSocket(const TcpSocket& other) = delete;
- TcpSocket& operator=(const TcpSocket& other) = delete;
-
- bool isValid(void);
-
- int close(void);
-
- /** Send data over the TCP connection.
- * @param data The buffer that will be sent.
- * @param size Number of bytes to send.
- * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
- * return number of bytes sent, 0 on timeout, or throws runtime_error.
- */
- ssize_t send(const void* data, size_t size, int timeout_ms=0);
-
- /** Receive data from the socket.
- * @param data The buffer that will receive data.
- * @param size The buffer size.
- * @return number of bytes received or -1 (SOCKET_ERROR) if error
- */
- ssize_t recv(void* data, size_t size);
-
- void listen(void);
- TcpSocket accept(void);
-
- /* Returns either valid socket if a connection was
- * accepted before the timeout expired, or an invalid
- * socket otherwise.
- */
- TcpSocket accept(int timeout_ms);
-
- /** Retrieve address this socket is bound to */
- InetAddress getOwnAddress() const;
- InetAddress getRemoteAddress() const;
-
- private:
- TcpSocket(SOCKET sock, InetAddress own, InetAddress remote);
-
- /// The address on which the socket is bound.
- InetAddress m_own_address;
- InetAddress m_remote_address;
- /// The low-level socket used by system functions.
- SOCKET m_sock;
-};
-
-/* Helper class for TCPDataDispatcher, contains a queue of pending data and
- * a sender thread. */
-class TCPConnection
-{
- public:
- TCPConnection(TcpSocket&& sock);
- TCPConnection(const TCPConnection&) = delete;
- TCPConnection& operator=(const TCPConnection&) = delete;
- ~TCPConnection();
-
- ThreadsafeQueue<std::vector<uint8_t> > queue;
-
- private:
- std::atomic<bool> m_running;
- std::thread m_sender_thread;
- TcpSocket m_sock;
-
- void process(void);
-};
-
-/* Send a TCP stream to several destinations, and automatically disconnect destinations
- * whose buffer overflows.
- */
-class TCPDataDispatcher
-{
- public:
- TCPDataDispatcher(size_t max_queue_size);
- ~TCPDataDispatcher();
- TCPDataDispatcher(const TCPDataDispatcher&) = delete;
- TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
-
- void start(int port, const std::string& address);
- void write(const std::vector<uint8_t>& data);
-
- private:
- void process(void);
-
- size_t m_max_queue_size;
-
- std::atomic<bool> m_running;
- std::thread m_listener_thread;
- TcpSocket m_listener_socket;
- std::list<TCPConnection> m_connections;
-};
-
-#endif // _TCPSOCKET
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
deleted file mode 100644
index ab287b2..0000000
--- a/src/ThreadsafeQueue.h
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- An implementation for a threadsafe queue, depends on C++11
-
- When creating a ThreadsafeQueue, one can specify the minimal number
- of elements it must contain before it is possible to take one
- element out.
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include <mutex>
-#include <condition_variable>
-#include <queue>
-#include <utility>
-
-/* This queue is meant to be used by two threads. One producer
- * that pushes elements into the queue, and one consumer that
- * retrieves the elements.
- *
- * The queue can make the consumer block until an element
- * is available, or a wakeup requested.
- */
-
-/* Class thrown by blocking pop to tell the consumer
- * that there's a wakeup requested. */
-class ThreadsafeQueueWakeup {};
-
-template<typename T>
-class ThreadsafeQueue
-{
-public:
- /* Push one element into the queue, and notify another thread that
- * might be waiting.
- *
- * returns the new queue size.
- */
- size_t push(T const& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- size_t push(T&& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.emplace(std::move(val));
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Push one element into the queue, but wait until the
- * queue size goes below the threshold.
- *
- * Notify waiting thread.
- *
- * returns the new queue size.
- */
- size_t push_wait_if_full(T const& val, size_t threshold)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() >= threshold) {
- the_tx_notification.wait(lock);
- }
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Trigger a wakeup event on a blocking consumer, which
- * will receive a ThreadsafeQueueWakeup exception.
- */
- void trigger_wakeup(void)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- wakeup_requested = true;
- lock.unlock();
- the_rx_notification.notify_one();
- }
-
- /* Send a notification for the receiver thread */
- void notify(void)
- {
- the_rx_notification.notify_one();
- }
-
- bool empty() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.empty();
- }
-
- size_t size() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.size();
- }
-
- bool try_pop(T& popped_value)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- if (the_queue.empty()) {
- return false;
- }
-
- popped_value = the_queue.front();
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
-
- return true;
- }
-
- void wait_and_pop(T& popped_value, size_t prebuffering = 1)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() < prebuffering and
- not wakeup_requested) {
- the_rx_notification.wait(lock);
- }
-
- if (wakeup_requested) {
- wakeup_requested = false;
- throw ThreadsafeQueueWakeup();
- }
- else {
- std::swap(popped_value, the_queue.front());
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
- }
- }
-
-private:
- std::queue<T> the_queue;
- mutable std::mutex the_mutex;
- std::condition_variable the_rx_notification;
- std::condition_variable the_tx_notification;
- bool wakeup_requested = false;
-};
-
diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp
deleted file mode 100644
index 3d015ec..0000000
--- a/src/UdpSocket.cpp
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "UdpSocket.h"
-
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-
-using namespace std;
-
-UdpSocket::UdpSocket() :
- listenSocket(INVALID_SOCKET)
-{
- reinit(0, "");
-}
-
-UdpSocket::UdpSocket(int port) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, "");
-}
-
-UdpSocket::UdpSocket(int port, const std::string& name) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, name);
-}
-
-
-int UdpSocket::setBlocking(bool block)
-{
- int res;
- if (block)
- res = fcntl(listenSocket, F_SETFL, 0);
- else
- res = fcntl(listenSocket, F_SETFL, O_NONBLOCK);
- if (res == SOCKET_ERROR) {
- setInetError("Can't change blocking state of socket");
- return -1;
- }
- return 0;
-}
-
-int UdpSocket::reinit(int port, const std::string& name)
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) {
- setInetError("Can't create socket");
- return -1;
- }
- reuseopt_t reuse = 1;
- if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- == SOCKET_ERROR) {
- setInetError("Can't reuse address");
- return -1;
- }
-
- if (port) {
- address.setAddress(name);
- address.setPort(port);
-
- if (::bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) {
- setInetError("Can't bind socket");
- ::close(listenSocket);
- listenSocket = INVALID_SOCKET;
- return -1;
- }
- }
- return 0;
-}
-
-int UdpSocket::close()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- listenSocket = INVALID_SOCKET;
-
- return 0;
-}
-
-UdpSocket::~UdpSocket()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-}
-
-
-int UdpSocket::receive(UdpPacket& packet)
-{
- socklen_t addrSize;
- addrSize = sizeof(*packet.getAddress().getAddress());
- ssize_t ret = recvfrom(listenSocket,
- packet.getData(),
- packet.getSize(),
- 0,
- packet.getAddress().getAddress(),
- &addrSize);
-
- if (ret == SOCKET_ERROR) {
- packet.setSize(0);
- if (errno == EAGAIN) {
- return 0;
- }
- setInetError("Can't receive UDP packet");
- return -1;
- }
-
- packet.setSize(ret);
- return 0;
-}
-
-int UdpSocket::send(UdpPacket& packet)
-{
- int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0,
- packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
-{
- int ret = sendto(listenSocket, &data[0], data.size(), 0,
- destination.getAddress(), sizeof(*destination.getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-/**
- * Must be called to receive data on a multicast address.
- * @param groupname The multica
-st address to join.
- * @return 0 if ok, -1 if error
- */
-int UdpSocket::joinGroup(char* groupname)
-{
- ip_mreqn group;
- if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
- setInetError(groupname);
- return -1;
- }
- if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
- setInetError("Not a multicast address");
- return -1;
- }
- group.imr_address.s_addr = htons(INADDR_ANY);;
- group.imr_ifindex = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
- == SOCKET_ERROR) {
- setInetError("Can't join multicast group");
- }
- return 0;
-}
-
-int UdpSocket::setMulticastTTL(int ttl)
-{
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
- == SOCKET_ERROR) {
- setInetError("Can't set ttl");
- return -1;
- }
-
- return 0;
-}
-
-int UdpSocket::setMulticastSource(const char* source_addr)
-{
- struct in_addr addr;
- if (inet_aton(source_addr, &addr) == 0) {
- setInetError("Can't parse source address");
- return -1;
- }
-
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
- == SOCKET_ERROR) {
- setInetError("Can't set source address");
- return -1;
- }
-
- return 0;
-}
-
-UdpPacket::UdpPacket() { }
-
-UdpPacket::UdpPacket(size_t initSize) :
- m_buffer(initSize)
-{ }
-
-
-void UdpPacket::setSize(size_t newSize)
-{
- m_buffer.resize(newSize);
-}
-
-
-uint8_t* UdpPacket::getData()
-{
- return &m_buffer[0];
-}
-
-
-void UdpPacket::addData(const void *data, size_t size)
-{
- uint8_t *d = (uint8_t*)data;
- std::copy(d, d + size, std::back_inserter(m_buffer));
-}
-
-size_t UdpPacket::getSize()
-{
- return m_buffer.size();
-}
-
-InetAddress UdpPacket::getAddress()
-{
- return address;
-}
-
diff --git a/src/UdpSocket.h b/src/UdpSocket.h
deleted file mode 100644
index f51e87c..0000000
--- a/src/UdpSocket.h
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "InetAddress.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-#define reuseopt_t int
-
-#include <stdlib.h>
-#include <iostream>
-#include <vector>
-
-class UdpPacket;
-
-
-/**
- * This class represents a socket for sending and receiving UDP packets.
- *
- * A UDP socket is the sending or receiving point for a packet delivery service.
- * Each packet sent or received on a datagram socket is individually
- * addressed and routed. Multiple packets sent from one machine to another may
- * be routed differently, and may arrive in any order.
- */
-class UdpSocket
-{
- public:
- /** Create a new socket that will not be bound to any port. To be used
- * for data output.
- */
- UdpSocket();
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- */
- UdpSocket(int port);
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- * @param name The IP address on which the socket will be bound.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- */
- UdpSocket(int port, const std::string& name);
- ~UdpSocket();
- UdpSocket(const UdpSocket& other) = delete;
- const UdpSocket& operator=(const UdpSocket& other) = delete;
-
- /** reinitialise socket. Close the already open socket, and
- * create a new one
- */
- int reinit(int port, const std::string& name);
-
- /** Close the socket
- */
- int close(void);
-
- /** Send an UDP packet.
- * @param packet The UDP packet to be sent. It includes the data and the
- * destination address
- * return 0 if ok, -1 if error
- */
- int send(UdpPacket& packet);
-
- /** Send an UDP packet
- *
- * return 0 if ok, -1 if error
- */
- int send(const std::vector<uint8_t>& data, InetAddress destination);
-
- /** Receive an UDP packet.
- * @param packet The packet that will receive the data. The address will be set
- * to the source address.
- * @return 0 if ok, -1 if error
- */
- int receive(UdpPacket& packet);
-
- int joinGroup(char* groupname);
- int setMulticastSource(const char* source_addr);
- int setMulticastTTL(int ttl);
-
- /** Set blocking mode. By default, the socket is blocking.
- * @return 0 if ok
- * -1 if error
- */
- int setBlocking(bool block);
-
- protected:
-
- /// The address on which the socket is bound.
- InetAddress address;
- /// The low-level socket used by system functions.
- SOCKET listenSocket;
-};
-
-/** This class represents a UDP packet.
- *
- * A UDP packet contains a payload (sequence of bytes) and an address. For
- * outgoing packets, the address is the destination address. For incoming
- * packets, the address tells the user from what source the packet arrived from.
- */
-class UdpPacket
-{
- public:
- /** Construct an empty UDP packet.
- */
- UdpPacket();
- UdpPacket(size_t initSize);
-
- /** Give the pointer to data.
- * @return The pointer
- */
- uint8_t* getData(void);
-
- /** Append some data at the end of data buffer and adjust size.
- * @param data Pointer to the data to add
- * @param size Size in bytes of new data
- */
- void addData(const void *data, size_t size);
-
- size_t getSize(void);
-
- /** Changes size of the data buffer size. Keeps data intact unless
- * truncated.
- */
- void setSize(size_t newSize);
-
- /** Returns the UDP address of the packet.
- */
- InetAddress getAddress(void);
-
- const std::vector<uint8_t>& getBuffer(void) const {
- return m_buffer;
- }
-
-
- private:
- std::vector<uint8_t> m_buffer;
- InetAddress address;
-};
-
diff --git a/src/crc.c b/src/crc.c
deleted file mode 100644
index cc02473..0000000
--- a/src/crc.c
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "crc.h"
-#ifndef _WIN32
-# include <unistd.h>
-# include <netinet/in.h>
-#endif
-#include <stdio.h>
-#include <fcntl.h>
-
-//#define CCITT 0x1021
-
-uint8_t crc8tab[256] = {
- 0x00, 0x07, 0x0e, 0x09, 0x1c, 0x1b, 0x12, 0x15,
- 0x38, 0x3f, 0x36, 0x31, 0x24, 0x23, 0x2a, 0x2d,
- 0x70, 0x77, 0x7e, 0x79, 0x6c, 0x6b, 0x62, 0x65,
- 0x48, 0x4f, 0x46, 0x41, 0x54, 0x53, 0x5a, 0x5d,
- 0xe0, 0xe7, 0xee, 0xe9, 0xfc, 0xfb, 0xf2, 0xf5,
- 0xd8, 0xdf, 0xd6, 0xd1, 0xc4, 0xc3, 0xca, 0xcd,
- 0x90, 0x97, 0x9e, 0x99, 0x8c, 0x8b, 0x82, 0x85,
- 0xa8, 0xaf, 0xa6, 0xa1, 0xb4, 0xb3, 0xba, 0xbd,
- 0xc7, 0xc0, 0xc9, 0xce, 0xdb, 0xdc, 0xd5, 0xd2,
- 0xff, 0xf8, 0xf1, 0xf6, 0xe3, 0xe4, 0xed, 0xea,
- 0xb7, 0xb0, 0xb9, 0xbe, 0xab, 0xac, 0xa5, 0xa2,
- 0x8f, 0x88, 0x81, 0x86, 0x93, 0x94, 0x9d, 0x9a,
- 0x27, 0x20, 0x29, 0x2e, 0x3b, 0x3c, 0x35, 0x32,
- 0x1f, 0x18, 0x11, 0x16, 0x03, 0x04, 0x0d, 0x0a,
- 0x57, 0x50, 0x59, 0x5e, 0x4b, 0x4c, 0x45, 0x42,
- 0x6f, 0x68, 0x61, 0x66, 0x73, 0x74, 0x7d, 0x7a,
- 0x89, 0x8e, 0x87, 0x80, 0x95, 0x92, 0x9b, 0x9c,
- 0xb1, 0xb6, 0xbf, 0xb8, 0xad, 0xaa, 0xa3, 0xa4,
- 0xf9, 0xfe, 0xf7, 0xf0, 0xe5, 0xe2, 0xeb, 0xec,
- 0xc1, 0xc6, 0xcf, 0xc8, 0xdd, 0xda, 0xd3, 0xd4,
- 0x69, 0x6e, 0x67, 0x60, 0x75, 0x72, 0x7b, 0x7c,
- 0x51, 0x56, 0x5f, 0x58, 0x4d, 0x4a, 0x43, 0x44,
- 0x19, 0x1e, 0x17, 0x10, 0x05, 0x02, 0x0b, 0x0c,
- 0x21, 0x26, 0x2f, 0x28, 0x3d, 0x3a, 0x33, 0x34,
- 0x4e, 0x49, 0x40, 0x47, 0x52, 0x55, 0x5c, 0x5b,
- 0x76, 0x71, 0x78, 0x7f, 0x6a, 0x6d, 0x64, 0x63,
- 0x3e, 0x39, 0x30, 0x37, 0x22, 0x25, 0x2c, 0x2b,
- 0x06, 0x01, 0x08, 0x0f, 0x1a, 0x1d, 0x14, 0x13,
- 0xae, 0xa9, 0xa0, 0xa7, 0xb2, 0xb5, 0xbc, 0xbb,
- 0x96, 0x91, 0x98, 0x9f, 0x8a, 0x8d, 0x84, 0x83,
- 0xde, 0xd9, 0xd0, 0xd7, 0xc2, 0xc5, 0xcc, 0xcb,
- 0xe6, 0xe1, 0xe8, 0xef, 0xfa, 0xfd, 0xf4, 0xf3
-};
-
-
-uint16_t crc16tab[256] = {
- 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
- 0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
- 0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
- 0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
- 0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
- 0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
- 0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
- 0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
- 0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
- 0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
- 0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
- 0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
- 0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
- 0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
- 0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
- 0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
- 0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
- 0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
- 0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
- 0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
- 0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
- 0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
- 0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
- 0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
- 0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
- 0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
- 0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
- 0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
- 0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
- 0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
- 0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
- 0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0
-};
-
-
-uint32_t crc32tab[256] = {
- 0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9,
- 0x130476dc, 0x17c56b6b, 0x1a864db2, 0x1e475005,
- 0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61,
- 0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd,
- 0x4c11db70, 0x48d0c6c7, 0x4593e01e, 0x4152fda9,
- 0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75,
- 0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011,
- 0x791d4014, 0x7ddc5da3, 0x709f7b7a, 0x745e66cd,
- 0x9823b6e0, 0x9ce2ab57, 0x91a18d8e, 0x95609039,
- 0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5,
- 0xbe2b5b58, 0xbaea46ef, 0xb7a96036, 0xb3687d81,
- 0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d,
- 0xd4326d90, 0xd0f37027, 0xddb056fe, 0xd9714b49,
- 0xc7361b4c, 0xc3f706fb, 0xceb42022, 0xca753d95,
- 0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1,
- 0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d,
- 0x34867077, 0x30476dc0, 0x3d044b19, 0x39c556ae,
- 0x278206ab, 0x23431b1c, 0x2e003dc5, 0x2ac12072,
- 0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16,
- 0x018aeb13, 0x054bf6a4, 0x0808d07d, 0x0cc9cdca,
- 0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde,
- 0x6b93dddb, 0x6f52c06c, 0x6211e6b5, 0x66d0fb02,
- 0x5e9f46bf, 0x5a5e5b08, 0x571d7dd1, 0x53dc6066,
- 0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba,
- 0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e,
- 0xbfa1b04b, 0xbb60adfc, 0xb6238b25, 0xb2e29692,
- 0x8aad2b2f, 0x8e6c3698, 0x832f1041, 0x87ee0df6,
- 0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a,
- 0xe0b41de7, 0xe4750050, 0xe9362689, 0xedf73b3e,
- 0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2,
- 0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, 0xcbffd686,
- 0xd5b88683, 0xd1799b34, 0xdc3abded, 0xd8fba05a,
- 0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637,
- 0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb,
- 0x4f040d56, 0x4bc510e1, 0x46863638, 0x42472b8f,
- 0x5c007b8a, 0x58c1663d, 0x558240e4, 0x51435d53,
- 0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47,
- 0x36194d42, 0x32d850f5, 0x3f9b762c, 0x3b5a6b9b,
- 0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff,
- 0x1011a0fa, 0x14d0bd4d, 0x19939b94, 0x1d528623,
- 0xf12f560e, 0xf5ee4bb9, 0xf8ad6d60, 0xfc6c70d7,
- 0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b,
- 0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f,
- 0xc423cd6a, 0xc0e2d0dd, 0xcda1f604, 0xc960ebb3,
- 0xbd3e8d7e, 0xb9ff90c9, 0xb4bcb610, 0xb07daba7,
- 0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b,
- 0x9b3660c6, 0x9ff77d71, 0x92b45ba8, 0x9675461f,
- 0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3,
- 0x5d8a9099, 0x594b8d2e, 0x5408abf7, 0x50c9b640,
- 0x4e8ee645, 0x4a4ffbf2, 0x470cdd2b, 0x43cdc09c,
- 0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8,
- 0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24,
- 0x119b4be9, 0x155a565e, 0x18197087, 0x1cd86d30,
- 0x029f3d35, 0x065e2082, 0x0b1d065b, 0x0fdc1bec,
- 0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088,
- 0x2497d08d, 0x2056cd3a, 0x2d15ebe3, 0x29d4f654,
- 0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0,
- 0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, 0xdbee767c,
- 0xe3a1cbc1, 0xe760d676, 0xea23f0af, 0xeee2ed18,
- 0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4,
- 0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0,
- 0x9abc8bd5, 0x9e7d9662, 0x933eb0bb, 0x97ffad0c,
- 0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668,
- 0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
-};
-
-// This function can be used to create a new table with a different polynom
-void init_crc8tab(uint8_t l_code, uint8_t l_init)
-{
- unsigned i, j, msb;
- uint8_t nb;
- uint8_t crc;
-
- for (i = 0; i < 256; ++i) {
- crc = l_init;
- nb = i ^ 0xff;
- for (j = 0; j < 8; ++j) {
- msb = (nb & (0x80 >> j)) && 1;
- msb ^= (crc >> 7);
- crc <<= 1;
- if (msb)
- crc ^= l_code;
- }
- crc8tab[i] = crc;
- }
-}
-
-
-void init_crc16tab(uint16_t l_code, uint16_t l_init)
-{
- unsigned i, j, msb;
- uint8_t nb;
- uint16_t crc;
-
- for (i = 0; i < 256; ++i) {
- crc = l_init;
- nb = i ^ 0xff;
- for (j = 0; j < 8; ++j) {
- msb = (nb & (0x80 >> j)) && 1;
- msb ^= (crc >> 15);
- crc <<= 1;
- if (msb)
- crc ^= l_code;
- }
- crc ^= 0xff00;
- crc16tab[i] = crc;
- }
-}
-
-
-void init_crc32tab(uint32_t l_code, uint32_t l_init)
-{
- unsigned i, j, msb;
- uint8_t nb;
- uint32_t crc;
-
- for (i = 0; i < 256; ++i) {
- crc = l_init;
- nb = i ^ 0xff;
- for (j = 0; j < 8; ++j) {
- msb = (nb & (0x80 >> j)) && 1;
- msb ^= (crc >> 31);
- crc <<= 1;
- if (msb)
- crc ^= l_code;
- }
- crc ^= 0xffffff00;
- crc32tab[i] = crc;
- }
-}
-
-
-uint8_t crc8(uint8_t l_crc, const void *lp_data, unsigned l_nb)
-{
- const uint8_t* data = (const uint8_t*)lp_data;
- while (l_nb--) {
- l_crc = crc8tab[l_crc ^ *(data++)];
- }
- return (l_crc);
-}
-
-
-uint16_t crc16(uint16_t l_crc, const void *lp_data, unsigned l_nb)
-{
- const uint8_t* data = (const uint8_t*)lp_data;
- while (l_nb--) {
- l_crc =
- (l_crc << 8) ^ crc16tab[(l_crc >> 8) ^ *(data++)];
- }
- return (l_crc);
-}
-
-
-uint32_t crc32(uint32_t l_crc, const void *lp_data, unsigned l_nb)
-{
- const uint8_t* data = (const uint8_t*)lp_data;
- while (l_nb--) {
- l_crc =
- (l_crc << 8) ^ crc32tab[((l_crc >> 24) ^ *(data++)) & 0xff];
- }
- return (l_crc);
-}
diff --git a/src/crc.h b/src/crc.h
deleted file mode 100644
index b1785a1..0000000
--- a/src/crc.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _CRC
-#define _CRC
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#ifndef _WIN32
- #include <stdint.h>
-#else
- #include <winsock2.h> // For types...
- typedef BYTE uint8_t;
- typedef WORD uint16_t;
- typedef DWORD32 uint32_t;
-#endif
-
-
-#ifdef __cplusplus
-extern "C" { // }
-#endif
-
-void init_crc8tab(uint8_t l_code, uint8_t l_init);
-uint8_t crc8(uint8_t l_crc, const void *lp_data, unsigned l_nb);
-extern uint8_t crc8tab[];
-
-void init_crc16tab(uint16_t l_code, uint16_t l_init);
-uint16_t crc16(uint16_t l_crc, const void *lp_data, unsigned l_nb);
-extern uint16_t crc16tab[];
-
-void init_crc32tab(uint32_t l_code, uint32_t l_init);
-uint32_t crc32(uint32_t l_crc, const void *lp_data, unsigned l_nb);
-extern uint32_t crc32tab[];
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif //_CRC
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h
index 9cc18d7..c7e570b 100644
--- a/src/dabOutput/dabOutput.h
+++ b/src/dabOutput/dabOutput.h
@@ -28,8 +28,7 @@
#pragma once
-#include "UdpSocket.h"
-#include "TcpSocket.h"
+#include "Socket.h"
#include "Log.h"
#include "string.h"
#include <stdexcept>
@@ -57,6 +56,8 @@ class DabOutput
{
return Open(name.c_str());
}
+
+ // Return -1 on failure
virtual int Write(void* buffer, int size) = 0;
virtual int Close() = 0;
@@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput
class DabOutputUdp : public DabOutput
{
public:
- DabOutputUdp() {
- packet_ = new UdpPacket(6144);
- socket_ = new UdpSocket();
- }
-
- virtual ~DabOutputUdp() {
- delete socket_;
- delete packet_;
- }
+ DabOutputUdp();
int Open(const char* name);
int Write(void* buffer, int size);
@@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput
DabOutputUdp operator=(const DabOutputUdp& other) = delete;
std::string uri_;
- UdpSocket* socket_;
- UdpPacket* packet_;
+ Socket::UDPSocket socket_;
+ Socket::UDPPacket packet_;
};
// -------------- TCP ------------------
@@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput
private:
std::string uri_;
- std::shared_ptr<TCPDataDispatcher> dispatcher_;
+ std::shared_ptr<Socket::TCPDataDispatcher> dispatcher_;
};
// -------------- Simul ------------------
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp
index 87dbfd5..4dc3538 100644
--- a/src/dabOutput/dabOutputTcp.cpp
+++ b/src/dabOutput/dabOutputTcp.cpp
@@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name)
uri_ = name;
if (success) {
- dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);
+ dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);
dispatcher_->start(port, address);
}
else {
diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp
index c129569..b9c22db 100644
--- a/src/dabOutput/dabOutputUdp.cpp
+++ b/src/dabOutput/dabOutputUdp.cpp
@@ -38,18 +38,12 @@
#include <cstdio>
#include <limits.h>
#include "dabOutput.h"
-#include "UdpSocket.h"
-
-#ifdef _WIN32
-# include <fscfg.h>
-# include <sdci.h>
-#else
-# include <netinet/in.h>
-# include <sys/types.h>
-# include <sys/socket.h>
-# include <sys/ioctl.h>
-# include <net/if_arp.h>
-#endif
+#include "Socket.h"
+
+DabOutputUdp::DabOutputUdp() :
+ socket_(),
+ packet_(6144)
+{ }
int DabOutputUdp::Open(const char* name)
{
@@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name)
regex_constants::match_default)) {
string address = what[1];
- if (this->packet_->getAddress().setAddress(address.c_str()) == -1) {
- etiLog.level(error) << "can't set address " <<
- address << "(" << inetErrDesc << ": " << inetErrMsg << ")";
- return -1;
- }
-
string port_str = what[2];
long port = std::strtol(port_str.c_str(), nullptr, 0);
@@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name)
return -1;
}
- this->packet_->getAddress().setPort(port);
+ packet_.address.resolveUdpDestination(address, port);
string query_params = what[3];
smatch query_what;
@@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name)
regex_constants::match_default)) {
string src = query_what[1];
- int err = socket_->setMulticastSource(src.c_str());
- if (err) {
- etiLog.level(error) << "UDP output socket set source failed!";
- return -1;
- }
+ try {
+ socket_.setMulticastSource(src.c_str());
- string ttl_str = query_what[2];
+ string ttl_str = query_what[2];
- if (not ttl_str.empty()) {
- long ttl = std::strtol(ttl_str.c_str(), nullptr, 0);
- if ((ttl <= 0) || (ttl >= 255)) {
- etiLog.level(error) << "Invalid TTL setting in " <<
- uri_without_proto;
- return -1;
- }
+ if (not ttl_str.empty()) {
+ long ttl = std::strtol(ttl_str.c_str(), nullptr, 0);
+ if ((ttl <= 0) || (ttl >= 255)) {
+ etiLog.level(error) << "Invalid TTL setting in " <<
+ uri_without_proto;
+ return -1;
+ }
- err = socket_->setMulticastTTL(ttl);
- if (err) {
- etiLog.level(error) << "UDP output socket set TTL failed!";
- return -1;
+ socket_.setMulticastTTL(ttl);
}
}
+ catch (const std::runtime_error& e) {
+ etiLog.level(error) << "Failed to set UDP output settings" << e.what();
+ }
}
else if (not query_params.empty()) {
etiLog.level(error) << "UDP output: could not parse parameters " <<
@@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name)
int DabOutputUdp::Write(void* buffer, int size)
{
- this->packet_->setSize(0);
- this->packet_->addData(buffer, size);
- return this->socket_->send(*this->packet_);
+ const uint8_t *buf = reinterpret_cast<uint8_t*>(buffer);
+ packet_.buffer.resize(0);
+ std::copy(buf, buf + size, std::back_inserter(packet_.buffer));
+ socket_.send(packet_);
+ return 0;
}
#endif // defined(HAVE_OUTPUT_UDP)
diff --git a/src/dabOutput/edi/AFPacket.cpp b/src/dabOutput/edi/AFPacket.cpp
deleted file mode 100644
index a58a980..0000000
--- a/src/dabOutput/edi/AFPacket.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- Copyright (C) 2014
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output.
- This implements an AF Packet as defined ETSI TS 102 821.
- Also see ETSI TS 102 693
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-#include "config.h"
-#include "crc.h"
-#include "AFPacket.h"
-#include "TagItems.h"
-#include "TagPacket.h"
-#include <vector>
-#include <string>
-#include <iostream>
-#include <cstdio>
-#include <stdint.h>
-#include <arpa/inet.h>
-
-namespace edi {
-
-// Header PT field. AF packet contains TAG payload
-const uint8_t AFHEADER_PT_TAG = 'T';
-
-// AF Packet Major (3 bits) and Minor (4 bits) version
-const uint8_t AFHEADER_VERSION = 0x10; // MAJ=1, MIN=0
-
-AFPacket AFPacketiser::Assemble(TagPacket tag_packet)
-{
- std::vector<uint8_t> payload = tag_packet.Assemble();
-
- if (m_verbose)
- std::cerr << "Assemble AFPacket " << seq << std::endl;
-
- std::string pack_data("AF"); // SYNC
- std::vector<uint8_t> packet(pack_data.begin(), pack_data.end());
-
- uint32_t taglength = payload.size();
-
- if (m_verbose)
- std::cerr << " AFPacket payload size " << payload.size() << std::endl;
-
- // write length into packet
- packet.push_back((taglength >> 24) & 0xFF);
- packet.push_back((taglength >> 16) & 0xFF);
- packet.push_back((taglength >> 8) & 0xFF);
- packet.push_back(taglength & 0xFF);
-
- // fill rest of header
- packet.push_back(seq >> 8);
- packet.push_back(seq & 0xFF);
- seq++;
- packet.push_back((have_crc ? 0x80 : 0) | AFHEADER_VERSION); // ar_cf: CRC=1
- packet.push_back(AFHEADER_PT_TAG);
-
- // insert payload, must have a length multiple of 8 bytes
- packet.insert(packet.end(), payload.begin(), payload.end());
-
- // calculate CRC over AF Header and payload
- uint16_t crc = 0xffff;
- crc = crc16(crc, &(packet.front()), packet.size());
- crc ^= 0xffff;
-
- if (m_verbose)
- fprintf(stderr, " AFPacket crc %x\n", crc);
-
- packet.push_back((crc >> 8) & 0xFF);
- packet.push_back(crc & 0xFF);
-
- if (m_verbose)
- std::cerr << " AFPacket length " << packet.size() << std::endl;
-
- return packet;
-}
-
-}
diff --git a/src/dabOutput/edi/AFPacket.h b/src/dabOutput/edi/AFPacket.h
deleted file mode 100644
index b4ccef1..0000000
--- a/src/dabOutput/edi/AFPacket.h
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- Copyright (C) 2014
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output.
- This implements an AF Packet as defined ETSI TS 102 821.
- Also see ETSI TS 102 693
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "config.h"
-#include <vector>
-#include <stdint.h>
-#include "TagItems.h"
-#include "TagPacket.h"
-
-namespace edi {
-
-typedef std::vector<uint8_t> AFPacket;
-
-// ETSI TS 102 821, 6.1 AF packet structure
-class AFPacketiser
-{
- public:
- AFPacketiser() :
- m_verbose(false) {};
- AFPacketiser(bool verbose) :
- m_verbose(verbose) {};
-
- AFPacket Assemble(TagPacket tag_packet);
-
- private:
- static const bool have_crc = true;
-
- uint16_t seq = 0; //counter that overflows at 0xFFFF
-
- bool m_verbose;
-};
-
-}
-
diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h
deleted file mode 100644
index 55d5f0f..0000000
--- a/src/dabOutput/edi/Config.h
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output,
- UDP and TCP transports and their configuration
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "config.h"
-#include <vector>
-#include <string>
-#include <memory>
-#include <cstdint>
-
-namespace edi {
-
-/** Configuration for EDI output */
-
-struct destination_t {
- virtual ~destination_t() {};
-};
-
-// Can represent both unicast and multicast destinations
-struct udp_destination_t : public destination_t {
- std::string dest_addr;
- std::string source_addr;
- unsigned int source_port = 0;
- unsigned int ttl = 10;
-};
-
-// TCP server that can accept multiple connections
-struct tcp_destination_t : public destination_t {
- unsigned int listen_port = 0;
- size_t max_frames_queued = 1024;
-};
-
-struct configuration_t {
- unsigned chunk_len = 207; // RSk, data length of each chunk
- unsigned fec = 0; // number of fragments that can be recovered
- bool dump = false; // dump a file with the EDI packets
- bool verbose = false;
- bool enable_pft = false; // Enable protection and fragmentation
- unsigned int tagpacket_alignment = 0;
- std::vector<std::shared_ptr<destination_t> > destinations;
- unsigned int dest_port = 0; // common destination port, because it's encoded in the transport layer
- unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms
-
- bool enabled() const { return destinations.size() > 0; }
- bool interleaver_enabled() const { return latency_frames > 0; }
-
- void print() const;
-};
-
-}
-
-
diff --git a/src/dabOutput/edi/Interleaver.cpp b/src/dabOutput/edi/Interleaver.cpp
deleted file mode 100644
index f26a50e..0000000
--- a/src/dabOutput/edi/Interleaver.cpp
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output,
- Interleaving of PFT fragments to increase robustness against
- burst packet loss.
-
- This is possible because EDI has to assume that fragments may reach
- the receiver out of order.
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "Interleaver.h"
-#include <cassert>
-
-namespace edi {
-
-void Interleaver::SetLatency(size_t latency_frames)
-{
- m_latency = latency_frames;
-}
-
-Interleaver::fragment_vec Interleaver::Interleave(fragment_vec &fragments)
-{
- m_fragment_count = fragments.size();
-
- // Create vectors containing Fcount*latency fragments in total
- // and store them into the deque
- if (m_buffer.empty()) {
- m_buffer.emplace_back();
- }
-
- auto& last_buffer = m_buffer.back();
-
- for (auto& fragment : fragments) {
- const bool last_buffer_is_complete =
- (last_buffer.size() >= m_fragment_count * m_latency);
-
- if (last_buffer_is_complete) {
- m_buffer.emplace_back();
- last_buffer = m_buffer.back();
- }
-
- last_buffer.push_back(std::move(fragment));
- }
-
- fragments.clear();
-
- while ( not m_buffer.empty() and
- (m_buffer.front().size() >= m_fragment_count * m_latency)) {
-
- auto& first_buffer = m_buffer.front();
-
- assert(first_buffer.size() == m_fragment_count * m_latency);
-
- /* Assume we have 5 fragments per AF frame, and latency of 3.
- * This will give the following strides:
- * 0 1 2
- * +-------+-------+---+
- * | 0 1 | 2 3 | 4 |
- * | | +---+ |
- * | 5 6 | 7 | 8 9 |
- * | +---+ | |
- * |10 |11 12 |13 14 |
- * +---+-------+-------+
- *
- * ix will be 0, 5, 10, 1, 6 in the first loop
- */
-
- for (size_t i = 0; i < m_fragment_count; i++) {
- const size_t ix = m_interleave_offset + m_fragment_count * m_stride;
- m_interleaved_fragments.push_back(first_buffer.at(ix));
-
- m_stride += 1;
- if (m_stride >= m_latency) {
- m_interleave_offset++;
- m_stride = 0;
- }
- }
-
- if (m_interleave_offset >= m_fragment_count) {
- m_interleave_offset = 0;
- m_stride = 0;
- m_buffer.pop_front();
- }
- }
-
- std::vector<PFTFragment> interleaved_frags;
-
- const size_t n = std::min(m_fragment_count, m_interleaved_fragments.size());
- std::move(m_interleaved_fragments.begin(),
- m_interleaved_fragments.begin() + n,
- std::back_inserter(interleaved_frags));
- m_interleaved_fragments.erase(
- m_interleaved_fragments.begin(),
- m_interleaved_fragments.begin() + n);
-
- return interleaved_frags;
-}
-
-}
-
-
diff --git a/src/dabOutput/edi/Interleaver.h b/src/dabOutput/edi/Interleaver.h
deleted file mode 100644
index f1cff30..0000000
--- a/src/dabOutput/edi/Interleaver.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output,
- Interleaving of PFT fragments to increase robustness against
- burst packet loss.
-
- This is possible because EDI has to assume that fragments may reach
- the receiver out of order.
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "config.h"
-#include <vector>
-#include <deque>
-#include <stdexcept>
-#include <stdint.h>
-#include "Log.h"
-#include "PFT.h"
-
-namespace edi {
-
-class Interleaver {
- public:
- using fragment_vec = std::vector<PFTFragment>;
-
- /* Configure the interleaver to use latency_frames number of AF
- * packets for interleaving. Total delay through the interleaver
- * will be latency_frames * 24ms
- */
- void SetLatency(size_t latency_frames);
-
- /* Move the fragments for an AF Packet into the interleaver and
- * return interleaved fragments to be transmitted.
- */
- fragment_vec Interleave(fragment_vec &fragments);
-
- private:
- size_t m_latency = 0;
- size_t m_fragment_count = 0;
- size_t m_interleave_offset = 0;
- size_t m_stride = 0;
-
- /* Buffer that accumulates enough fragments to interleave */
- std::deque<fragment_vec> m_buffer;
-
- /* Buffer that contains fragments that have been interleaved,
- * to avoid that the interleaver output is too bursty
- */
- std::deque<PFTFragment> m_interleaved_fragments;
-};
-
-}
-
diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp
deleted file mode 100644
index 5b93016..0000000
--- a/src/dabOutput/edi/PFT.cpp
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output,
- Protection, Fragmentation and Transport. (PFT)
-
- Are supported:
- Reed-Solomon and Fragmentation
-
- This implements part of PFT as defined ETSI TS 102 821.
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "config.h"
-#include <vector>
-#include <list>
-#include <cstdio>
-#include <cstring>
-#include <stdint.h>
-#include <arpa/inet.h>
-#include <stdexcept>
-#include <sstream>
-#include "PFT.h"
-#include "crc.h"
-#include "ReedSolomon.h"
-
-namespace edi {
-
-using namespace std;
-
-// An integer division that rounds up, i.e. ceil(a/b)
-#define CEIL_DIV(a, b) (a % b == 0 ? a / b : a / b + 1)
-
-PFT::PFT() { }
-
-PFT::PFT(const configuration_t &conf) :
- m_k(conf.chunk_len),
- m_m(conf.fec),
- m_dest_port(conf.dest_port),
- m_pseq(0),
- m_num_chunks(0),
- m_verbose(conf.verbose)
- {
- if (m_k > 207) {
- etiLog.level(warn) <<
- "EDI PFT: maximum chunk size is 207.";
- throw std::out_of_range("EDI PFT Chunk size too large.");
- }
-
- if (m_m > 5) {
- etiLog.level(warn) <<
- "EDI PFT: high number of recoverable fragments"
- " may lead to large overhead";
- // See TS 102 821, 7.2.1 Known values, list entry for 'm'
- }
- }
-
-RSBlock PFT::Protect(AFPacket af_packet)
-{
- RSBlock rs_block;
-
- // number of chunks is ceil(afpacketsize / m_k)
- // TS 102 821 7.2.2: c = ceil(l / k_max)
- m_num_chunks = CEIL_DIV(af_packet.size(), m_k);
-
- if (m_verbose) {
- fprintf(stderr, "Protect %zu chunks of size %zu\n",
- m_num_chunks, af_packet.size());
- }
-
- // calculate size of chunk:
- // TS 102 821 7.2.2: k = ceil(l / c)
- // chunk_len does not include the 48 bytes of protection.
- const size_t chunk_len = CEIL_DIV(af_packet.size(), m_num_chunks);
- if (chunk_len > 207) {
- std::stringstream ss;
- ss << "Chunk length " << chunk_len << " too large (>207)";
- throw std::runtime_error(ss.str());
- }
-
- // The last RS chunk is zero padded
- // TS 102 821 7.2.2: z = c*k - l
- const size_t zero_pad = m_num_chunks * chunk_len - af_packet.size();
-
- // Create the RS(k+p,k) encoder
- const int firstRoot = 1; // Discovered by analysing EDI dump
- const int gfPoly = 0x11d;
- const bool reverse = false;
- // The encoding has to be 255, 207 always, because the chunk has to
- // be padded at the end, and not at the beginning as libfec would
- // do
- ReedSolomon rs_encoder(255, 207, reverse, gfPoly, firstRoot);
-
- // add zero padding to last chunk
- for (size_t i = 0; i < zero_pad; i++) {
- af_packet.push_back(0);
- }
-
- if (m_verbose) {
- fprintf(stderr, " add %zu zero padding\n", zero_pad);
- }
-
- // Calculate RS for each chunk and assemble RS block
- for (size_t i = 0; i < af_packet.size(); i+= chunk_len) {
- vector<uint8_t> chunk(207);
- vector<uint8_t> protection(PARITYBYTES);
-
- // copy chunk_len bytes into new chunk
- memcpy(&chunk.front(), &af_packet[i], chunk_len);
-
- // calculate RS for chunk with padding
- rs_encoder.encode(&chunk.front(), &protection.front(), 207);
-
- // Drop the padding
- chunk.resize(chunk_len);
-
- // append new chunk and protection to the RS Packet
- rs_block.insert(rs_block.end(), chunk.begin(), chunk.end());
- rs_block.insert(rs_block.end(), protection.begin(), protection.end());
- }
-
- return rs_block;
-}
-
-vector< vector<uint8_t> > PFT::ProtectAndFragment(AFPacket af_packet)
-{
- const bool enable_RS = (m_m > 0);
-
- if (enable_RS) {
- RSBlock rs_block = Protect(af_packet);
-
-#if 0
- fprintf(stderr, " af_packet (%zu):", af_packet.size());
- for (size_t i = 0; i < af_packet.size(); i++) {
- fprintf(stderr, "%02x ", af_packet[i]);
- }
- fprintf(stderr, "\n");
-
- fprintf(stderr, " rs_block (%zu):", rs_block.size());
- for (size_t i = 0; i < rs_block.size(); i++) {
- fprintf(stderr, "%02x ", rs_block[i]);
- }
- fprintf(stderr, "\n");
-#endif
-
- // TS 102 821 7.2.2: s_max = MIN(floor(c*p/(m+1)), MTU - h))
- const size_t max_payload_size = ( m_num_chunks * PARITYBYTES ) / (m_m + 1);
-
- // Calculate fragment count and size
- // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max)
- // l + c*p + z = length of RS block
- const size_t num_fragments = CEIL_DIV(rs_block.size(), max_payload_size);
-
- // TS 102 821 7.2.2: ceil((l + c*p + z) / f)
- const size_t fragment_size = CEIL_DIV(rs_block.size(), num_fragments);
-
- if (m_verbose)
- fprintf(stderr, " PnF fragment_size %zu, num frag %zu\n",
- fragment_size, num_fragments);
-
- vector< vector<uint8_t> > fragments(num_fragments);
-
- for (size_t i = 0; i < num_fragments; i++) {
- fragments[i].resize(fragment_size);
- for (size_t j = 0; j < fragment_size; j++) {
- const size_t ix = j*num_fragments + i;
- if (ix < rs_block.size()) {
- fragments[i][j] = rs_block[ix];
- }
- else {
- fragments[i][j] = 0;
- }
- }
- }
-
- return fragments;
- }
- else { // No RS, only fragmentation
- // TS 102 821 7.2.2: s_max = MTU - h
- // Ethernet MTU is 1500, but maybe you are routing over a network which
- // has some sort of packet encapsulation. Add some margin.
- const size_t max_payload_size = 1400;
-
- // Calculate fragment count and size
- // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max)
- // l + c*p + z = length of AF packet
- const size_t num_fragments = CEIL_DIV(af_packet.size(), max_payload_size);
-
- // TS 102 821 7.2.2: ceil((l + c*p + z) / f)
- const size_t fragment_size = CEIL_DIV(af_packet.size(), num_fragments);
- vector< vector<uint8_t> > fragments(num_fragments);
-
- for (size_t i = 0; i < num_fragments; i++) {
- fragments[i].reserve(fragment_size);
-
- for (size_t j = 0; j < fragment_size; j++) {
- const size_t ix = i*fragment_size + j;
- if (ix < af_packet.size()) {
- fragments[i].push_back(af_packet.at(ix));
- }
- else {
- break;
- }
- }
- }
-
- return fragments;
- }
-}
-
-std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet)
-{
- vector< vector<uint8_t> > fragments = ProtectAndFragment(af_packet);
- vector< vector<uint8_t> > pft_fragments; // These contain PF headers
-
- const bool enable_RS = (m_m > 0);
- const bool enable_transport = true;
-
- unsigned int findex = 0;
-
- unsigned fcount = fragments.size();
-
- // calculate size of chunk:
- // TS 102 821 7.2.2: k = ceil(l / c)
- // chunk_len does not include the 48 bytes of protection.
- const size_t chunk_len = enable_RS ?
- CEIL_DIV(af_packet.size(), m_num_chunks) : 0;
-
- // The last RS chunk is zero padded
- // TS 102 821 7.2.2: z = c*k - l
- const size_t zero_pad = enable_RS ?
- m_num_chunks * chunk_len - af_packet.size() : 0;
-
- for (const auto &fragment : fragments) {
- // Psync
- std::string psync("PF");
- std::vector<uint8_t> packet(psync.begin(), psync.end());
-
- // Pseq
- packet.push_back(m_pseq >> 8);
- packet.push_back(m_pseq & 0xFF);
-
- // Findex
- packet.push_back(findex >> 16);
- packet.push_back(findex >> 8);
- packet.push_back(findex & 0xFF);
- findex++;
-
- // Fcount
- packet.push_back(fcount >> 16);
- packet.push_back(fcount >> 8);
- packet.push_back(fcount & 0xFF);
-
- // RS (1 bit), transport (1 bit) and Plen (14 bits)
- unsigned int plen = fragment.size();
- if (enable_RS) {
- plen |= 0x8000; // Set FEC bit
- }
-
- if (enable_transport) {
- plen |= 0x4000; // Set ADDR bit
- }
-
- packet.push_back(plen >> 8);
- packet.push_back(plen & 0xFF);
-
- if (enable_RS) {
- packet.push_back(chunk_len); // RSk
- packet.push_back(zero_pad); // RSz
- }
-
- if (enable_transport) {
- // Source (16 bits)
- uint16_t addr_source = 0;
- packet.push_back(addr_source >> 8);
- packet.push_back(addr_source & 0xFF);
-
- // Dest (16 bits)
- packet.push_back(m_dest_port >> 8);
- packet.push_back(m_dest_port & 0xFF);
- }
-
- // calculate CRC over AF Header and payload
- uint16_t crc = 0xffff;
- crc = crc16(crc, &(packet.front()), packet.size());
- crc ^= 0xffff;
-
- packet.push_back((crc >> 8) & 0xFF);
- packet.push_back(crc & 0xFF);
-
- // insert payload, must have a length multiple of 8 bytes
- packet.insert(packet.end(), fragment.begin(), fragment.end());
-
- pft_fragments.push_back(packet);
-
-#if 0
- fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n",
- m_pseq, findex, fcount, plen & ~0x8000);
-#endif
- }
-
- m_pseq++;
-
- return pft_fragments;
-}
-
-}
-
diff --git a/src/dabOutput/edi/PFT.h b/src/dabOutput/edi/PFT.h
deleted file mode 100644
index 4076bf3..0000000
--- a/src/dabOutput/edi/PFT.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output,
- Protection, Fragmentation and Transport. (PFT)
-
- Are supported:
- Reed-Solomon and Fragmentation
-
- This implements part of PFT as defined ETSI TS 102 821.
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "config.h"
-#include <vector>
-#include <list>
-#include <stdexcept>
-#include <stdint.h>
-#include "AFPacket.h"
-#include "Log.h"
-#include "ReedSolomon.h"
-#include "dabOutput/edi/Config.h"
-
-namespace edi {
-
-typedef std::vector<uint8_t> RSBlock;
-typedef std::vector<uint8_t> PFTFragment;
-
-class PFT
-{
- public:
- static constexpr int PARITYBYTES = 48;
-
- PFT();
- PFT(const configuration_t& conf);
-
- // return a list of PFT fragments with the correct
- // PFT headers
- std::vector< PFTFragment > Assemble(AFPacket af_packet);
-
- // Apply Reed-Solomon FEC to the AF Packet
- RSBlock Protect(AFPacket af_packet);
-
- // Cut a RSBlock into several fragments that can be transmitted
- std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet);
-
- private:
- unsigned int m_k = 207; // length of RS data word
- unsigned int m_m = 3; // number of fragments that can be recovered if lost
- unsigned int m_dest_port = 12000; // Destination port for transport header
- uint16_t m_pseq = 0;
- size_t m_num_chunks = 0;
- bool m_verbose = 0;
-};
-
-}
-
diff --git a/src/dabOutput/edi/TagItems.cpp b/src/dabOutput/edi/TagItems.cpp
deleted file mode 100644
index dfb4934..0000000
--- a/src/dabOutput/edi/TagItems.cpp
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- EDI output.
- This defines a few TAG items as defined ETSI TS 102 821 and
- ETSI TS 102 693
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "config.h"
-#include "TagItems.h"
-#include <vector>
-#include <iostream>
-#include <string>
-#include <stdint.h>
-#include <stdexcept>
-
-namespace edi {
-
-std::vector<uint8_t> TagStarPTR::Assemble()
-{
- //std::cerr << "TagItem *ptr" << std::endl;
- std::string pack_data("*ptr");
- std::vector<uint8_t> packet(pack_data.begin(), pack_data.end());
-
- packet.push_back(0);
- packet.push_back(0);
- packet.push_back(0);
- packet.push_back(0x40);
-
- std::string protocol("DETI");
- packet.insert(packet.end(), protocol.begin(), protocol.end());
-
- // Major
- packet.push_back(0);
- packet.push_back(0);
-
- // Minor
- packet.push_back(0);
- packet.push_back(0);
- return packet;
-}
-
-std::vector<uint8_t> TagDETI::Assemble()
-{
- std::string pack_data("deti");
- std::vector<uint8_t> packet(pack_data.begin(), pack_data.end());
- packet.reserve(256);
-
- // Placeholder for length
- packet.push_back(0);
- packet.push_back(0);
- packet.push_back(0);
- packet.push_back(0);
-
- uint8_t fct = dlfc % 250;
- uint8_t fcth = dlfc / 250;
-
-
- uint16_t detiHeader = fct | (fcth << 8) | (rfudf << 13) | (ficf << 14) | (atstf << 15);
- packet.push_back(detiHeader >> 8);
- packet.push_back(detiHeader & 0xFF);
-
- uint32_t etiHeader = mnsc | (rfu << 16) | (rfa << 17) |
- (fp << 19) | (mid << 22) | (stat << 24);
- packet.push_back((etiHeader >> 24) & 0xFF);
- packet.push_back((etiHeader >> 16) & 0xFF);
- packet.push_back((etiHeader >> 8) & 0xFF);
- packet.push_back(etiHeader & 0xFF);
-
- if (atstf) {
- packet.push_back(utco);
-
- packet.push_back((seconds >> 24) & 0xFF);
- packet.push_back((seconds >> 16) & 0xFF);
- packet.push_back((seconds >> 8) & 0xFF);
- packet.push_back(seconds & 0xFF);
-
- packet.push_back((tsta >> 16) & 0xFF);
- packet.push_back((tsta >> 8) & 0xFF);
- packet.push_back(tsta & 0xFF);
- }
-
- if (ficf) {
- for (size_t i = 0; i < fic_length; i++) {
- packet.push_back(fic_data[i]);
- }
- }
-
- if (rfudf) {
- packet.push_back((rfud >> 16) & 0xFF);
- packet.push_back((rfud >> 8) & 0xFF);
- packet.push_back(rfud & 0xFF);
- }
-
- // calculate and update size
- // remove TAG name and TAG length fields and convert to bits
- uint32_t taglength = (packet.size() - 8) * 8;
-
- // write length into packet
- packet[4] = (taglength >> 24) & 0xFF;
- packet[5] = (taglength >> 16) & 0xFF;
- packet[6] = (taglength >> 8) & 0xFF;
- packet[7] = taglength & 0xFF;
-
- dlfc = (dlfc+1) % 5000;
-
- /*
- std::cerr << "TagItem deti, packet.size " << packet.size() << std::endl;
- std::cerr << " fic length " << fic_length << std::endl;
- std::cerr << " length " << taglength / 8 << std::endl;
- */
- return packet;
-}
-
-void TagDETI::set_edi_time(const std::time_t t, int tai_utc_offset)
-{
- utco = tai_utc_offset - 32;
-
- const std::time_t posix_timestamp_1_jan_2000 = 946684800;
-
- seconds = t - posix_timestamp_1_jan_2000 + utco;
-}
-
-std::vector<uint8_t> TagESTn::Assemble()
-{
- std::string pack_data("est");
- std::vector<uint8_t> packet(pack_data.begin(), pack_data.end());
- packet.reserve(mst_length*8 + 16);
-
- packet.push_back(id);
-
- // Placeholder for length
- packet.push_back(0);
- packet.push_back(0);
- packet.push_back(0);
- packet.push_back(0);
-
- if (tpl > 0x3F) {
- throw std::runtime_error("TagESTn: invalid TPL value");
- }
-
- if (sad > 0x3FF) {
- throw std::runtime_error("TagESTn: invalid SAD value");
- }
-
- if (scid > 0x3F) {
- throw std::runtime_error("TagESTn: invalid SCID value");
- }
-
- uint32_t sstc = (scid << 18) | (sad << 8) | (tpl << 2) | rfa;
- packet.push_back((sstc >> 16) & 0xFF);
- packet.push_back((sstc >> 8) & 0xFF);
- packet.push_back(sstc & 0xFF);
-
- for (size_t i = 0; i < mst_length * 8; i++) {
- packet.push_back(mst_data[i]);
- }
-
- // calculate and update size
- // remove TAG name and TAG length fields and convert to bits
- uint32_t taglength = (packet.size() - 8) * 8;
-
- // write length into packet
- packet[4] = (taglength >> 24) & 0xFF;
- packet[5] = (taglength >> 16) & 0xFF;
- packet[6] = (taglength >> 8) & 0xFF;
- packet[7] = taglength & 0xFF;
-
- /*
- std::cerr << "TagItem ESTn, length " << packet.size() << std::endl;
- std::cerr << " mst_length " << mst_length << std::endl;
- */
- return packet;
-}
-
-std::vector<uint8_t> TagStarDMY::Assemble()
-{
- std::string pack_data("*dmy");
- std::vector<uint8_t> packet(pack_data.begin(), pack_data.end());
-
- packet.resize(4 + 4 + length_);
-
- const uint32_t length_bits = length_ * 8;
-
- packet[4] = (length_bits >> 24) & 0xFF;
- packet[5] = (length_bits >> 16) & 0xFF;
- packet[6] = (length_bits >> 8) & 0xFF;
- packet[7] = length_bits & 0xFF;
-
- // The remaining bytes in the packet are "undefined data"
-
- return packet;
-}
-
-}
-
diff --git a/src/dabOutput/edi/TagItems.h b/src/dabOutput/edi/TagItems.h
deleted file mode 100644
index b29a142..0000000
--- a/src/dabOutput/edi/TagItems.h
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- EDI output.
- This defines a few TAG items as defined ETSI TS 102 821 and
- ETSI TS 102 693
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "config.h"
-#include "Eti.h"
-#include <vector>
-#include <chrono>
-#include <string>
-#include <stdint.h>
-
-namespace edi {
-
-class TagItem
-{
- public:
- virtual std::vector<uint8_t> Assemble() = 0;
-};
-
-// ETSI TS 102 693, 5.1.1 Protocol type and revision
-class TagStarPTR : public TagItem
-{
- public:
- std::vector<uint8_t> Assemble();
-};
-
-// ETSI TS 102 693, 5.1.3 DAB ETI(LI) Management (deti)
-class TagDETI : public TagItem
-{
- public:
- std::vector<uint8_t> Assemble();
-
- /***** DATA in intermediary format ****/
- // For the ETI Header: must be defined !
- uint8_t stat = 0;
- uint8_t mid = 0;
- uint8_t fp = 0;
- uint8_t rfa = 0;
- uint8_t rfu = 0; // MNSC is valid
- uint16_t mnsc = 0;
- uint16_t dlfc = 0; // modulo 5000 frame counter
-
- // ATST (optional)
- bool atstf = false; // presence of atst data
-
- /* UTCO: Offset (in seconds) between UTC and the Seconds value. The
- * value is expressed as an unsigned 8-bit quantity. As of February
- * 2009, the value shall be 2 and shall change as a result of each
- * modification of the number of leap seconds, as proscribed by
- * International Earth Rotation and Reference Systems Service (IERS).
- *
- * According to Annex F
- * EDI = TAI - 32s (constant)
- * EDI = UTC + UTCO
- * we derive
- * UTCO = TAI-UTC - 32
- * where the TAI-UTC offset is given by the USNO bulletin using
- * the ClockTAI module.
- */
- uint8_t utco = 0;
-
- /* Update the EDI time. t is in UTC */
- void set_edi_time(const std::time_t t, int tai_utc_offset);
-
- /* The number of SI seconds since 2000-01-01 T 00:00:00 UTC as an
- * unsigned 32-bit quantity. Contrary to POSIX, this value also
- * counts leap seconds.
- */
- uint32_t seconds = 0;
-
- /* TSTA: Shall be the 24 least significant bits of the Time Stamp
- * (TIST) field from the STI-D(LI) Frame. The full definition for the
- * STI TIST can be found in annex B of EN 300 797 [4]. The most
- * significant 8 bits of the TIST field of the incoming STI-D(LI)
- * frame, if required, may be carried in the RFAD field.
- */
- uint32_t tsta = 0xFFFFFF;
-
- // the FIC (optional)
- bool ficf = false;
- const unsigned char* fic_data;
- size_t fic_length;
-
- // rfu
- bool rfudf = false;
- uint32_t rfud = 0;
-
-
-};
-
-// ETSI TS 102 693, 5.1.5 ETI Sub-Channel Stream <n>
-class TagESTn : public TagItem
-{
- public:
- std::vector<uint8_t> Assemble();
-
- // SSTCn
- uint8_t scid;
- uint16_t sad;
- uint8_t tpl;
- uint8_t rfa;
-
- // Pointer to MSTn data
- uint8_t* mst_data;
- size_t mst_length; // STLn * 8 bytes
-
- uint8_t id;
-};
-
-// ETSI TS 102 821, 5.2.2.2 Dummy padding
-class TagStarDMY : public TagItem
-{
- public:
- /* length is the TAG value length in bytes */
- TagStarDMY(uint32_t length) : length_(length) {}
- std::vector<uint8_t> Assemble();
-
- private:
- uint32_t length_;
-};
-
-}
-
diff --git a/src/dabOutput/edi/TagPacket.cpp b/src/dabOutput/edi/TagPacket.cpp
deleted file mode 100644
index b16dc33..0000000
--- a/src/dabOutput/edi/TagPacket.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- Copyright (C) 2014
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output.
- This defines a TAG Packet.
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "config.h"
-#include "Eti.h"
-#include "TagPacket.h"
-#include "TagItems.h"
-#include <vector>
-#include <iostream>
-#include <string>
-#include <list>
-#include <stdint.h>
-#include <cassert>
-
-namespace edi {
-
-TagPacket::TagPacket(unsigned int alignment) : m_alignment(alignment)
-{ }
-
-std::vector<uint8_t> TagPacket::Assemble()
-{
- std::list<TagItem*>::iterator tag;
-
- std::vector<uint8_t> packet;
-
- //std::cerr << "Assemble TAGPacket" << std::endl;
-
- for (tag = tag_items.begin(); tag != tag_items.end(); ++tag) {
- std::vector<uint8_t> tag_data = (*tag)->Assemble();
- packet.insert(packet.end(), tag_data.begin(), tag_data.end());
-
- //std::cerr << " Add TAGItem of length " << tag_data.size() << std::endl;
- }
-
- if (m_alignment == 0) { /* no padding */ }
- else if (m_alignment == 8) {
- // Add padding inside TAG packet
- while (packet.size() % 8 > 0) {
- packet.push_back(0); // TS 102 821, 5.1, "padding shall be undefined"
- }
- }
- else if (m_alignment > 8) {
- TagStarDMY dmy(m_alignment - 8);
- auto dmy_data = dmy.Assemble();
- packet.insert(packet.end(), dmy_data.begin(), dmy_data.end());
- }
- else {
- std::cerr << "Invalid alignment requirement " << m_alignment <<
- " defined in TagPacket" << std::endl;
- }
-
- return packet;
-}
-
-}
-
diff --git a/src/dabOutput/edi/TagPacket.h b/src/dabOutput/edi/TagPacket.h
deleted file mode 100644
index a861cbb..0000000
--- a/src/dabOutput/edi/TagPacket.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- Copyright (C) 2014
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output.
- This defines a TAG Packet.
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "config.h"
-#include "TagItems.h"
-#include <vector>
-#include <string>
-#include <list>
-#include <stdint.h>
-
-namespace edi {
-
-// A TagPacket is nothing else than a list of tag items, with an
-// Assemble function that puts the bytestream together and adds
-// padding such that the total length is a multiple of 8 Bytes.
-//
-// ETSI TS 102 821, 5.1 Tag Packet
-class TagPacket
-{
- public:
- TagPacket(unsigned int alignment);
- std::vector<uint8_t> Assemble();
-
- std::list<TagItem*> tag_items;
-
- private:
- unsigned int m_alignment;
-};
-
-}
-
diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp
deleted file mode 100644
index d99e987..0000000
--- a/src/dabOutput/edi/Transport.cpp
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output,
- UDP and TCP transports and their configuration
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "Transport.h"
-#include <iterator>
-
-using namespace std;
-
-namespace edi {
-
-void configuration_t::print() const
-{
- etiLog.level(info) << "EDI";
- etiLog.level(info) << " verbose " << verbose;
- for (auto edi_dest : destinations) {
- if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
- etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << dest_port;
- if (not udp_dest->source_addr.empty()) {
- etiLog.level(info) << " source " << udp_dest->source_addr;
- etiLog.level(info) << " ttl " << udp_dest->ttl;
- }
- etiLog.level(info) << " source port " << udp_dest->source_port;
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
- etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
- etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
- }
- else {
- throw std::logic_error("EDI destination not implemented");
- }
- }
- if (interleaver_enabled()) {
- etiLog.level(info) << " interleave " << latency_frames * 24 << " ms";
- }
-}
-
-
-Sender::Sender(const configuration_t& conf) :
- m_conf(conf),
- edi_pft(m_conf)
-{
- if (m_conf.verbose) {
- etiLog.log(info, "Setup EDI");
- }
-
- for (const auto& edi_dest : m_conf.destinations) {
- if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
- auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port);
-
- if (not udp_dest->source_addr.empty()) {
- int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
- if (err) {
- throw runtime_error("EDI socket set source failed!");
- }
- err = udp_socket->setMulticastTTL(udp_dest->ttl);
- if (err) {
- throw runtime_error("EDI socket set TTL failed!");
- }
- }
-
- udp_sockets.emplace(udp_dest.get(), udp_socket);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
- auto dispatcher = make_shared<TCPDataDispatcher>(tcp_dest->max_frames_queued);
- dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
- tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
- }
- else {
- throw std::logic_error("EDI destination not implemented");
- }
- }
-
- if (m_conf.interleaver_enabled()) {
- edi_interleaver.SetLatency(m_conf.latency_frames);
- }
-
- if (m_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (m_conf.verbose) {
- etiLog.log(info, "EDI set up");
- }
-}
-
-void Sender::write(const TagPacket& tagpacket)
-{
- // Assemble into one AF Packet
- edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket);
-
- if (m_conf.enable_pft) {
- // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
- vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
-
- if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu",
- edi_fragments.size());
- }
-
- if (m_conf.interleaver_enabled()) {
- edi_fragments = edi_interleaver.Interleave(edi_fragments);
- }
-
- // Send over ethernet
- for (const auto& edi_frag : edi_fragments) {
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- InetAddress addr;
- addr.setAddress(udp_dest->dest_addr.c_str());
- addr.setPort(m_conf.dest_port);
-
- udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
- }
- else {
- throw std::logic_error("EDI destination not implemented");
- }
- }
-
- if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
- }
-
- if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu",
- edi_fragments.size());
- }
- }
- else {
- // Send over ethernet
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- InetAddress addr;
- addr.setAddress(udp_dest->dest_addr.c_str());
- addr.setPort(m_conf.dest_port);
-
- udp_sockets.at(udp_dest.get())->send(af_packet, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
- }
- else {
- throw std::logic_error("EDI destination not implemented");
- }
- }
-
- if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(af_packet.begin(), af_packet.end(), debug_iterator);
- }
- }
-}
-
-}
diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h
deleted file mode 100644
index 7b0a0db..0000000
--- a/src/dabOutput/edi/Transport.h
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
-
- EDI output,
- UDP and TCP transports and their configuration
-
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "config.h"
-#include "dabOutput/edi/Config.h"
-#include "AFPacket.h"
-#include "PFT.h"
-#include "Interleaver.h"
-#include <vector>
-#include <unordered_map>
-#include <stdexcept>
-#include <cstdint>
-#include "dabOutput/dabOutput.h"
-
-namespace edi {
-
-/** Configuration for EDI output */
-
-class Sender {
- public:
- Sender(const configuration_t& conf);
-
- void write(const TagPacket& tagpacket);
-
- private:
- configuration_t m_conf;
- std::ofstream edi_debug_file;
-
- // The TagPacket will then be placed into an AFPacket
- edi::AFPacketiser edi_afPacketiser;
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT edi_pft;
-
- // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
- edi::Interleaver edi_interleaver;
-
- std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets;
- std::unordered_map<tcp_destination_t*, std::shared_ptr<TCPDataDispatcher>> tcp_dispatchers;
-};
-
-}
-
diff --git a/src/dabOutput/metadata.h b/src/dabOutput/metadata.h
index ed16de5..34e146f 100644
--- a/src/dabOutput/metadata.h
+++ b/src/dabOutput/metadata.h
@@ -54,6 +54,8 @@ enum class output_metadata_id_e {
};
struct OutputMetadata {
+ virtual ~OutputMetadata() {};
+
virtual output_metadata_id_e getId(void) const = 0;
virtual size_t getLength(void) const = 0;
diff --git a/src/fig/FIG0_19.cpp b/src/fig/FIG0_19.cpp
index f032bd5..5b6a384 100644
--- a/src/fig/FIG0_19.cpp
+++ b/src/fig/FIG0_19.cpp
@@ -109,6 +109,19 @@ FillStatus FIG0_19::fill(uint8_t *buf, size_t max_size)
else {
fig0_19->ASw = 0;
}
+
+ /* From the crc-mmbtools google groups, 2019-07-11, L. Cornell:
+ *
+ * Long ago, there was a defined use for the New flag - it was intended
+ * to indicate whether the announcement was new or was a repeated
+ * announcement. But the problem is that it doesn't really help
+ * receivers because they might tune to the ensemble at any time, and
+ * might tune to a service that may be interrupted at any time. So
+ * some years ago it was decided that the New flag would not longer be
+ * used in transmissions. The setting was fixed to be 1 because some
+ * receivers would have never switched to the announcement if the flag
+ * was set to 0.
+ */
fig0_19->NewFlag = 1;
fig0_19->RegionFlag = 0;
fig0_19->SubChId = 0;
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
new file mode 100644
index 0000000..b5301d2
--- /dev/null
+++ b/src/input/Edi.cpp
@@ -0,0 +1,427 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Edi.h"
+
+#include <regex>
+#include <chrono>
+#include <stdexcept>
+#include <sstream>
+#include <cstring>
+#include <cmath>
+#include <cstdlib>
+#include <cerrno>
+#include <climits>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+constexpr bool VERBOSE = false;
+constexpr size_t TCP_BLOCKSIZE = 2048;
+
+Edi::Edi(const std::string& name, const dab_input_edi_config_t& config) :
+ RemoteControllable(name),
+ m_tcp_receive_server(TCP_BLOCKSIZE),
+ m_sti_writer(bind(&Edi::m_new_sti_frame_callback, this, placeholders::_1)),
+ m_sti_decoder(m_sti_writer, VERBOSE),
+ m_max_frames_overrun(config.buffer_size),
+ m_num_frames_prebuffering(config.prebuffering),
+ m_name(name),
+ m_stats(name)
+{
+ RC_ADD_PARAMETER(buffermanagement,
+ "Set type of buffer management to use [prebuffering, timestamped]");
+
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [24ms frames]");
+
+ RC_ADD_PARAMETER(prebuffering,
+ "Min buffer level before streaming starts [24ms frames]");
+
+ RC_ADD_PARAMETER(tistdelay, "TIST delay to add [ms]");
+}
+
+Edi::~Edi() {
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void Edi::open(const std::string& name)
+{
+ const std::regex re_udp("udp://:([0-9]+)");
+ const std::regex re_tcp("tcp://(.*):([0-9]+)");
+
+ lock_guard<mutex> lock(m_mutex);
+
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+
+ std::smatch m;
+ if (std::regex_match(name, m, re_udp)) {
+ const int udp_port = std::stoi(m[1].str());
+ m_input_used = InputUsed::UDP;
+ m_udp_sock.reinit(udp_port);
+ m_udp_sock.setBlocking(false);
+ // TODO multicast
+ }
+ else if (std::regex_match(name, m, re_tcp)) {
+ m_input_used = InputUsed::TCP;
+ const string addr = m[1].str();
+ const int tcp_port = std::stoi(m[2].str());
+ m_tcp_receive_server.start(tcp_port, addr);
+ }
+ else {
+ throw runtime_error("Cannot parse EDI input URI");
+ }
+
+ m_stats.registerAtServer();
+
+ m_running = true;
+ m_thread = std::thread(&Edi::m_run, this);
+}
+
+size_t Edi::readFrame(uint8_t *buffer, size_t size)
+{
+ // Save stats data in bytes, not in frames
+ m_stats.notifyBuffer(m_frames.size() * size);
+
+ EdiDecoder::sti_frame_t sti;
+ if (m_is_prebuffering) {
+ m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering;
+ if (not m_is_prebuffering) {
+ etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ }
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
+ }
+ else if (not m_pending_sti_frame.frame.empty()) {
+ // Can only happen when switching from timestamp-based buffer management!
+ if (m_pending_sti_frame.frame.size() != size) {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
+ }
+ else {
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right);
+
+ copy(m_pending_sti_frame.frame.begin(),
+ m_pending_sti_frame.frame.end(),
+ buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ }
+ else if (m_frames.try_pop(sti)) {
+ if (sti.frame.size() == 0) {
+ etiLog.level(debug) << "EDI input " << m_name << " empty frame";
+ return 0;
+ }
+ else if (sti.frame.size() == size) {
+ // Steady-state when everything works well
+ if (m_frames.size() > m_max_frames_overrun) {
+ m_stats.notifyOverrun();
+
+ /* If the buffer is too full, we drop as many frames as needed
+ * to get down to the prebuffering size. We would like to have our buffer
+ * filled to the prebuffering length. */
+ size_t over_max = m_frames.size() - m_num_frames_prebuffering;
+
+ while (over_max--) {
+ EdiDecoder::sti_frame_t discard;
+ m_frames.try_pop(discard);
+ }
+ }
+
+ if (not sti.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ sti.version_data.version,
+ sti.version_data.uptime_s);
+ }
+ m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right);
+
+ copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
+ return size;
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ sti.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ return 0;
+ }
+ }
+ else {
+ memset(buffer, 0, size * sizeof(*buffer));
+ m_is_prebuffering = true;
+ etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ m_stats.notifyUnderrun();
+ return 0;
+ }
+}
+
+size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ if (m_pending_sti_frame.frame.empty()) {
+ m_frames.try_pop(m_pending_sti_frame);
+ }
+
+ m_stats.notifyBuffer(m_frames.size() * size);
+
+ if (m_is_prebuffering) {
+ if (m_pending_sti_frame.frame.empty()) {
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else if (m_pending_sti_frame.frame.size() == size) {
+ // readFrame gets called every 24ms, so we allow max 24ms
+ // difference between the input frame timestamp and the requested
+ // timestamp.
+ if (m_pending_sti_frame.timestamp.valid()) {
+ auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
+ ts_req += m_tist_delay;
+ const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
+
+ if (offset < 24e-3) {
+ m_is_prebuffering = false;
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " valid timestamp, pre-buffering complete";
+
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
+ m_pending_sti_frame.audio_levels.right);
+ copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ else {
+ // Wait more, but erase the front of the frame queue to avoid
+ // stalling on one frame with incorrect timestamp
+ if (m_frames.size() >= m_max_frames_overrun) {
+ m_pending_sti_frame.frame.clear();
+ }
+ m_stats.notifyUnderrun();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name <<
+ " skipping frame without timestamp";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ m_pending_sti_frame.frame.clear();
+ memset(buffer, 0, size);
+ return 0;
+ }
+ }
+ else {
+ if (m_pending_sti_frame.frame.empty()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " empty, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ m_stats.notifyUnderrun();
+ m_is_prebuffering = true;
+ return 0;
+ }
+ else if (not m_pending_sti_frame.timestamp.valid()) {
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " invalid timestamp, ignoring";
+ memset(buffer, 0, size);
+ m_pending_sti_frame.frame.clear();
+ m_stats.notifyUnderrun();
+ return 0;
+ }
+ else {
+ auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
+ ts_req += m_tist_delay;
+ const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req);
+
+ if (offset > 24e-3) {
+ m_stats.notifyUnderrun();
+ m_is_prebuffering = true;
+ m_pending_sti_frame.frame.clear();
+ etiLog.level(warn) << "EDI input " << m_name <<
+ " timestamp out of bounds, re-enabling pre-buffering";
+ memset(buffer, 0, size);
+ return 0;
+ }
+ else {
+ if (not m_pending_sti_frame.version_data.version.empty()) {
+ m_stats.notifyVersion(
+ m_pending_sti_frame.version_data.version,
+ m_pending_sti_frame.version_data.uptime_s);
+ }
+
+ m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left,
+ m_pending_sti_frame.audio_levels.right);
+ copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ return size;
+ }
+ }
+ }
+}
+
+void Edi::m_run()
+{
+ while (m_running) {
+ switch (m_input_used) {
+ case InputUsed::UDP:
+ {
+ constexpr size_t packsize = 2048;
+ const auto packet = m_udp_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ if (not packet.buffer.empty()) {
+ m_sti_decoder.push_packet(packet.buffer);
+ }
+ else {
+ this_thread::sleep_for(chrono::milliseconds(12));
+ }
+ }
+ break;
+ case InputUsed::TCP:
+ {
+ auto packet = m_tcp_receive_server.receive();
+ if (not packet.empty()) {
+ m_sti_decoder.push_bytes(packet);
+ }
+ else {
+ this_thread::sleep_for(chrono::milliseconds(12));
+ }
+ }
+ break;
+ default:
+ throw logic_error("unimplemented input");
+ }
+ }
+}
+
+void Edi::m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& sti) {
+ if (not sti.frame.empty()) {
+ // We should not wait here, because we want the complete input buffering
+ // happening inside m_frames. Using the blocking function is only a protection
+ // against runaway memory usage if something goes wrong in the consumer.
+ m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2);
+ }
+}
+
+int Edi::setBitrate(int bitrate)
+{
+ if (bitrate <= 0) {
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
+ }
+
+ return bitrate;
+}
+
+void Edi::close()
+{
+ m_udp_sock.close();
+}
+
+
+void Edi::set_parameter(const std::string& parameter, const std::string& value)
+{
+ if (parameter == "buffer") {
+ size_t new_limit = atol(value.c_str());
+ m_max_frames_overrun = new_limit;
+ }
+ else if (parameter == "prebuffering") {
+ size_t new_limit = atol(value.c_str());
+ m_num_frames_prebuffering = new_limit;
+ }
+ else if (parameter == "buffermanagement") {
+ if (value == "prebuffering") {
+ setBufferManagement(Inputs::BufferManagement::Prebuffering);
+ }
+ else if (value == "timestamped") {
+ setBufferManagement(Inputs::BufferManagement::Timestamped);
+ }
+ else {
+ throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name());
+ }
+ }
+ else if (parameter == "tistdelay") {
+ m_tist_delay = chrono::milliseconds(stoi(value));
+ }
+ else {
+ throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
+ }
+}
+
+const std::string Edi::get_parameter(const std::string& parameter) const
+{
+ stringstream ss;
+ if (parameter == "buffer") {
+ ss << m_max_frames_overrun;
+ }
+ else if (parameter == "prebuffering") {
+ ss << m_num_frames_prebuffering;
+ }
+ else if (parameter == "buffermanagement") {
+ switch (getBufferManagement()) {
+ case Inputs::BufferManagement::Prebuffering:
+ ss << "prebuffering";
+ break;
+ case Inputs::BufferManagement::Timestamped:
+ ss << "Timestamped";
+ break;
+ }
+ }
+ else if (parameter == "tistdelay") {
+ ss << m_tist_delay.count();
+ }
+ else {
+ throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
+ }
+ return ss.str();
+}
+
+}
diff --git a/src/input/Edi.h b/src/input/Edi.h
new file mode 100644
index 0000000..ca465bd
--- /dev/null
+++ b/src/input/Edi.h
@@ -0,0 +1,126 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <deque>
+#include <thread>
+#include <mutex>
+#include "Socket.h"
+#include "input/inputs.h"
+#include "edi/STIDecoder.hpp"
+#include "edi/STIWriter.hpp"
+#include "ThreadsafeQueue.h"
+#include "ManagementServer.h"
+
+namespace Inputs {
+
+struct dab_input_edi_config_t
+{
+ /* The size of the internal buffer, measured in number
+ * of elements.
+ *
+ * Each element corresponds to one frame, i.e. 24ms
+ */
+ size_t buffer_size = 100;
+
+ /* The amount of prebuffering to do before we start streaming
+ *
+ * Same units as buffer_size
+ */
+ size_t prebuffering = 30;
+};
+
+/*
+ * Receives EDI from UDP or TCP in a separate thread and pushes that data
+ * into the STIDecoder. Complete frames are then put into a queue for the consumer.
+ *
+ * This way, the EDI decoding happens in a separate thread.
+ */
+class Edi : public InputBase, public RemoteControllable {
+ public:
+ Edi(const std::string& name, const dab_input_edi_config_t& config);
+ Edi(const Edi&) = delete;
+ Edi& operator=(const Edi&) = delete;
+ ~Edi();
+
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
+ virtual int setBitrate(int bitrate);
+ virtual void close();
+
+ /* Remote control */
+ virtual void set_parameter(const std::string& parameter, const std::string& value);
+ virtual const std::string get_parameter(const std::string& parameter) const;
+
+ protected:
+ void m_run();
+
+ void m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& frame);
+
+ std::mutex m_mutex;
+
+ enum class InputUsed { Invalid, UDP, TCP };
+ InputUsed m_input_used = InputUsed::Invalid;
+ Socket::UDPSocket m_udp_sock;
+ Socket::TCPReceiveServer m_tcp_receive_server;
+
+ EdiDecoder::STIWriter m_sti_writer;
+ EdiDecoder::STIDecoder m_sti_decoder;
+ std::thread m_thread;
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames;
+
+ // InputBase defines bufferManagement and tist delay
+
+ // Used in timestamp-based buffer management
+ EdiDecoder::sti_frame_t m_pending_sti_frame;
+
+ // State variable used in prebuffering-based buffer management
+ bool m_is_prebuffering = true;
+
+ /* When using prebuffering, consider the buffer to be full on the
+ * receive side if it's above the overrun threshold.
+ *
+ * When using timestamping, start discarding the front of the queue once the queue
+ * is this full. Must be smaller than m_max_frames_queued.
+ *
+ * Parameter 'buffer' inside RC. */
+ std::atomic<size_t> m_max_frames_overrun = ATOMIC_VAR_INIT(1000);
+
+ /* When not using timestamping, how many frames to prebuffer.
+ * Parameter 'prebuffering' inside RC. */
+ std::atomic<size_t> m_num_frames_prebuffering = ATOMIC_VAR_INIT(10);
+
+ std::string m_name;
+ InputStat m_stats;
+};
+
+};
+
diff --git a/src/input/File.cpp b/src/input/File.cpp
index 20036ae..46bfb59 100644
--- a/src/input/File.cpp
+++ b/src/input/File.cpp
@@ -35,6 +35,8 @@
#include "mpeg.h"
#include "ReedSolomon.h"
+using namespace std;
+
namespace Inputs {
#ifdef _WIN32
@@ -58,7 +60,7 @@ __attribute((packed))
;
-int FileBase::open(const std::string& name)
+void FileBase::open(const std::string& name)
{
int flags = O_RDONLY | O_BINARY;
if (m_nonblock) {
@@ -67,30 +69,35 @@ int FileBase::open(const std::string& name)
m_fd = ::open(name.c_str(), flags);
if (m_fd == -1) {
- throw std::runtime_error("Could not open input file " + name + ": " +
+ throw runtime_error("Could not open input file " + name + ": " +
strerror(errno));
}
+}
+
+size_t FileBase::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ // Will not be implemented, as there is no obvious way to carry timestamps
+ // in files.
+ memset(buffer, 0, size);
return 0;
}
int FileBase::setBitrate(int bitrate)
{
if (bitrate <= 0) {
- etiLog.log(error, "Invalid bitrate (%i)", bitrate);
- return -1;
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate));
}
return bitrate;
}
-int FileBase::close()
+void FileBase::close()
{
if (m_fd != -1) {
::close(m_fd);
m_fd = -1;
}
- return 0;
}
void FileBase::setNonblocking(bool nonblock)
@@ -182,7 +189,7 @@ ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size)
return size;
}
-int MPEGFile::readFrame(uint8_t* buffer, size_t size)
+size_t MPEGFile::readFrame(uint8_t *buffer, size_t size)
{
int result;
bool do_rewind = false;
@@ -275,12 +282,18 @@ MUTE_SUBCHANNEL:
}
}
}
+
+ // TODO this is probably wrong, because it should return
+ // the number of bytes written.
return result;
}
int MPEGFile::setBitrate(int bitrate)
{
- if (bitrate == 0) {
+ if (bitrate < 0) {
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate));
+ }
+ else if (bitrate == 0) {
uint8_t buffer[4];
if (readFrame(buffer, 4) == 0) {
@@ -294,7 +307,7 @@ int MPEGFile::setBitrate(int bitrate)
return bitrate;
}
-int RawFile::readFrame(uint8_t* buffer, size_t size)
+size_t RawFile::readFrame(uint8_t *buffer, size_t size)
{
return readFromFile(buffer, size);
}
@@ -304,7 +317,7 @@ PacketFile::PacketFile(bool enhancedPacketMode)
m_enhancedPacketEnabled = enhancedPacketMode;
}
-int PacketFile::readFrame(uint8_t* buffer, size_t size)
+size_t PacketFile::readFrame(uint8_t *buffer, size_t size)
{
size_t written = 0;
int length;
@@ -357,7 +370,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)
length = 24;
}
else {
- std::copy(m_packetData.begin(),
+ copy(m_packetData.begin(),
m_packetData.begin() + m_packetLength,
buffer);
length = m_packetLength;
@@ -365,7 +378,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)
}
}
else {
- std::copy(m_packetData.begin(),
+ copy(m_packetData.begin(),
m_packetData.begin() + m_packetLength,
buffer);
length = m_packetLength;
diff --git a/src/input/File.h b/src/input/File.h
index b574c39..39ce7fd 100644
--- a/src/input/File.h
+++ b/src/input/File.h
@@ -36,10 +36,11 @@ namespace Inputs {
class FileBase : public InputBase {
public:
- virtual int open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size) = 0;
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size) = 0;
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
virtual int setBitrate(int bitrate);
- virtual int close();
+ virtual void close();
virtual void setNonblocking(bool nonblock);
@@ -63,7 +64,7 @@ class FileBase : public InputBase {
class MPEGFile : public FileBase {
public:
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
virtual int setBitrate(int bitrate);
private:
@@ -72,13 +73,13 @@ class MPEGFile : public FileBase {
class RawFile : public FileBase {
public:
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
};
class PacketFile : public FileBase {
public:
PacketFile(bool enhancedPacketMode);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
protected:
std::array<uint8_t, 96> m_packetData;
diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp
index 7856a46..155e625 100644
--- a/src/input/Prbs.cpp
+++ b/src/input/Prbs.cpp
@@ -44,7 +44,7 @@ namespace Inputs {
// Preferred polynomial is G(x) = x^20 + x^17 + 1
const uint32_t PRBS_DEFAULT_POLY = (1 << 20) | (1 << 17) | (1 << 0);
-int Prbs::open(const string& name)
+void Prbs::open(const string& name)
{
if (name.substr(0, 7) != "prbs://") {
throw logic_error("Invalid PRBS name");
@@ -73,11 +73,9 @@ int Prbs::open(const string& name)
m_prbs.setup(polynomial);
}
rewind();
-
- return 0;
}
-int Prbs::readFrame(uint8_t* buffer, size_t size)
+size_t Prbs::readFrame(uint8_t *buffer, size_t size)
{
for (size_t i = 0; i < size; ++i) {
buffer[i] = m_prbs.step();
@@ -86,14 +84,22 @@ int Prbs::readFrame(uint8_t* buffer, size_t size)
return size;
}
+size_t Prbs::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ memset(buffer, 0, size);
+ return 0;
+}
+
int Prbs::setBitrate(int bitrate)
{
+ if (bitrate <= 0) {
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate));
+ }
return bitrate;
}
-int Prbs::close()
+void Prbs::close()
{
- return 0;
}
int Prbs::rewind()
diff --git a/src/input/Prbs.h b/src/input/Prbs.h
index 51b7756..e2b94ec 100644
--- a/src/input/Prbs.h
+++ b/src/input/Prbs.h
@@ -37,10 +37,11 @@ namespace Inputs {
class Prbs : public InputBase {
public:
- virtual int open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
virtual int setBitrate(int bitrate);
- virtual int close();
+ virtual void close();
private:
virtual int rewind();
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
index 2cb49e7..a37ee21 100644
--- a/src/input/Udp.cpp
+++ b/src/input/Udp.cpp
@@ -38,7 +38,7 @@ using namespace std;
namespace Inputs {
-int Udp::open(const std::string& name)
+void Udp::open(const std::string& name)
{
// Skip the udp:// part if it is present
const string endpoint = (name.substr(0, 6) == "udp://") ?
@@ -57,8 +57,6 @@ int Udp::open(const std::string& name)
m_name = name;
openUdpSocket(endpoint);
-
- return 0;
}
void Udp::openUdpSocket(const std::string& endpoint)
@@ -82,61 +80,50 @@ void Udp::openUdpSocket(const std::string& endpoint)
throw out_of_range("can't use port number 0 in udp address");
}
- if (m_sock.reinit(port, address) == -1) {
- stringstream ss;
- ss << "Could not init UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
-
- if (m_sock.setBlocking(false) == -1) {
- stringstream ss;
- ss << "Could not set non-blocking UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ m_sock.reinit(port, address);
+ m_sock.setBlocking(false);
etiLog.level(info) << "Opened UDP port " << address << ":" << port;
}
-int Udp::readFrame(uint8_t* buffer, size_t size)
+size_t Udp::readFrame(uint8_t *buffer, size_t size)
{
// Regardless of buffer contents, try receiving data.
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
-
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ auto packet = m_sock.receive(32768);
- std::copy(packet.getData(), packet.getData() + packet.getSize(),
- back_inserter(m_buffer));
+ std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));
// Take data from the buffer if it contains enough data,
// in any case write the buffer
if (m_buffer.size() >= (size_t)size) {
std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer);
+ return size;
}
else {
memset(buffer, 0x0, size);
+ return 0;
}
+}
- return size;
+size_t Udp::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ // Maybe there's a way to carry timestamps, but we don't need it.
+ memset(buffer, 0x0, size);
+ return 0;
}
int Udp::setBitrate(int bitrate)
{
if (bitrate <= 0) {
- etiLog.log(error, "Invalid bitrate (%i)\n", bitrate);
- return -1;
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
}
return bitrate;
}
-int Udp::close()
+void Udp::close()
{
- return m_sock.close();
+ m_sock.close();
}
@@ -165,10 +152,10 @@ static uint16_t unpack2(const uint8_t *buf)
return (((uint16_t)buf[0]) << 8) | buf[1];
}
-int Sti_d_Rtp::open(const std::string& name)
+void Sti_d_Rtp::open(const std::string& name)
{
- // Skip the sti-rtp:// part if it is present
- const string endpoint = (name.substr(0, 10) == "sti-rtp://") ?
+ // Skip the rtp:// part if it is present
+ const string endpoint = (name.substr(0, 10) == "rtp://") ?
name.substr(10) : name;
// The endpoint should be address:port
@@ -176,43 +163,34 @@ int Sti_d_Rtp::open(const std::string& name)
if (colon_pos == string::npos) {
stringstream ss;
ss << "'" << name <<
- " is an invalid format for sti-rtp address: "
- "expected [sti-rtp://]address:port";
+ " is an invalid format for rtp address: "
+ "expected [rtp://]address:port";
throw invalid_argument(ss.str());
}
m_name = name;
openUdpSocket(endpoint);
-
- return 0;
}
void Sti_d_Rtp::receive_packet()
{
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
+ auto packet = m_sock.receive(32768);
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
-
- if (packet.getSize() == 0) {
+ if (packet.buffer.empty()) {
// No packet was received
return;
}
const size_t STI_FC_LEN = 8;
- if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
+ if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
etiLog.level(info) << "Received too small RTP packet for " <<
m_name;
return;
}
- if (not rtpHeaderValid(packet.getData())) {
+ if (not rtpHeaderValid(packet.buffer.data())) {
etiLog.level(info) << "Received invalid RTP header for " <<
m_name;
return;
@@ -220,7 +198,7 @@ void Sti_d_Rtp::receive_packet()
// STI(PI, X)
size_t index = RTP_HEADER_LEN;
- const uint8_t *buf = packet.getData();
+ const uint8_t *buf = packet.buffer.data();
// SYNC
index++; // Advance over STAT
@@ -242,7 +220,7 @@ void Sti_d_Rtp::receive_packet()
m_name;
return;
}
- if (packet.getSize() < index + DFS) {
+ if (packet.buffer.size() < index + DFS) {
etiLog.level(info) << "Received STI too small for given DFS for " <<
m_name;
return;
@@ -270,9 +248,9 @@ void Sti_d_Rtp::receive_packet()
uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits
index += 2;
- if (packet.getSize() < index + 4*NST) {
+ if (packet.buffer.size() < index + 4*NST) {
etiLog.level(info) << "Received STI too small to contain NST for " <<
- m_name << " packet: " << packet.getSize() << " need " <<
+ m_name << " packet: " << packet.buffer.size() << " need " <<
index + 4*NST;
return;
}
@@ -307,7 +285,7 @@ void Sti_d_Rtp::receive_packet()
}
}
-int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)
+size_t Sti_d_Rtp::readFrame(uint8_t *buffer, size_t size)
{
// Make sure we fill faster than we consume in case there
// are pending packets.
@@ -316,19 +294,20 @@ int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)
if (m_queue.empty()) {
memset(buffer, 0x0, size);
+ return 0;
}
else if (m_queue.front().size() != size) {
etiLog.level(warn) << "Invalid input data size for STI " << m_name <<
" : RX " << m_queue.front().size() << " expected " << size;
memset(buffer, 0x0, size);
m_queue.pop_front();
+ return 0;
}
else {
copy(m_queue.front().begin(), m_queue.front().end(), buffer);
m_queue.pop_front();
+ return size;
}
-
- return 0;
}
}
diff --git a/src/input/Udp.h b/src/input/Udp.h
index dc01486..e5961c7 100644
--- a/src/input/Udp.h
+++ b/src/input/Udp.h
@@ -31,7 +31,7 @@
#include <deque>
#include <boost/thread.hpp>
#include "input/inputs.h"
-#include "UdpSocket.h"
+#include "Socket.h"
namespace Inputs {
@@ -40,13 +40,14 @@ namespace Inputs {
*/
class Udp : public InputBase {
public:
- virtual int open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
virtual int setBitrate(int bitrate);
- virtual int close();
+ virtual void close();
protected:
- UdpSocket m_sock;
+ Socket::UDPSocket m_sock;
std::string m_name;
void openUdpSocket(const std::string& endpoint);
@@ -67,8 +68,8 @@ class Sti_d_Rtp : public Udp {
using vec_u8 = std::vector<uint8_t>;
public:
- virtual int open(const std::string& name);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual void open(const std::string& name);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
private:
void receive_packet(void);
diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp
index 2e35907..0a9d59d 100644
--- a/src/input/Zmq.cpp
+++ b/src/input/Zmq.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018 Matthias P. Braendli
+ Copyright (C) 2019 Matthias P. Braendli
http://www.opendigitalradio.org
ZeroMQ input. see www.zeromq.org for more info
@@ -220,7 +220,7 @@ void ZmqBase::rebind()
}
}
-int ZmqBase::open(const std::string& inputUri)
+void ZmqBase::open(const std::string& inputUri)
{
m_inputUri = inputUri;
@@ -229,33 +229,32 @@ int ZmqBase::open(const std::string& inputUri)
// We want to appear in the statistics !
m_stats.registerAtServer();
-
- return 0;
}
-int ZmqBase::close()
+void ZmqBase::close()
{
m_zmq_sock.close();
- return 0;
}
int ZmqBase::setBitrate(int bitrate)
{
+ if (bitrate <= 0) {
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
+ }
+
m_bitrate = bitrate;
- return bitrate; // TODO do a nice check here
+ return bitrate;
}
// size corresponds to a frame size. It is constant for a given bitrate
-int ZmqBase::readFrame(uint8_t* buffer, size_t size)
+size_t ZmqBase::readFrame(uint8_t* buffer, size_t size)
{
- int rc;
-
/* We must *always* read data from the ZMQ socket,
* to make sure that ZMQ internal buffers are emptied
* quickly. It's the only way to control the buffers
* of the whole path from encoder to our frame_buffer.
*/
- rc = readFromSocket(size);
+ const auto readsize = readFromSocket(size);
/* Notify of a buffer overrun, and drop some frames */
if (m_frame_buffer.size() >= m_config.buffer_size) {
@@ -296,10 +295,10 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
}
if (m_prebuf_current > 0) {
- if (rc > 0)
+ if (readsize > 0)
m_prebuf_current--;
if (m_prebuf_current == 0)
- etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
+ etiLog.log(info, "inputZMQ %s input pre-buffering complete",
m_rc_name.c_str());
/* During prebuffering, give a zeroed frame to the mux */
@@ -312,7 +311,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
m_stats.notifyBuffer(m_frame_buffer.size() * size);
if (m_frame_buffer.empty()) {
- etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
+ etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering",
m_rc_name.c_str());
// reset prebuffering
m_prebuf_current = m_config.prebuffering;
@@ -332,6 +331,13 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)
}
}
+size_t ZmqBase::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ // TODO add timestamps into the metadata and implement this
+ memset(buffer, 0, size);
+ return 0;
+}
+
/******** MPEG input *******/
diff --git a/src/input/Zmq.h b/src/input/Zmq.h
index eb67fe5..2e37b5f 100644
--- a/src/input/Zmq.h
+++ b/src/input/Zmq.h
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2017 Matthias P. Braendli
+ Copyright (C) 2019 Matthias P. Braendli
http://www.opendigitalradio.org
ZeroMQ input. see www.zeromq.org for more info
@@ -45,7 +45,7 @@
#include <list>
#include <string>
-#include <stdint.h>
+#include <cstdint>
#include "zmq.hpp"
#include "input/inputs.h"
#include "ManagementServer.h"
@@ -156,6 +156,7 @@ class ZmqBase : public InputBase, public RemoteControllable {
m_bitrate(0),
m_enable_input(true),
m_config(config),
+ m_name(name),
m_stats(name),
m_prebuf_current(config.prebuffering) {
RC_ADD_PARAMETER(enable,
@@ -180,10 +181,11 @@ class ZmqBase : public InputBase, public RemoteControllable {
INVALIDATE_KEY(m_curve_encoder_key);
}
- virtual int open(const std::string& inputUri);
- virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual void open(const std::string& inputUri);
+ virtual size_t readFrame(uint8_t *buffer, size_t size);
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
virtual int setBitrate(int bitrate);
- virtual int close();
+ virtual void close();
/* Remote control */
virtual void set_parameter(const std::string& parameter,
@@ -220,6 +222,7 @@ class ZmqBase : public InputBase, public RemoteControllable {
char m_curve_encoder_key[CURVE_KEYLEN+1];
std::string m_inputUri;
+ std::string m_name;
InputStat m_stats;
diff --git a/src/input/inputs.h b/src/input/inputs.h
index bfb1fb6..83cdbf2 100644
--- a/src/input/inputs.h
+++ b/src/input/inputs.h
@@ -2,7 +2,7 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -35,17 +35,63 @@
namespace Inputs {
+enum class BufferManagement {
+ // Use a buffer in the input that doesn't consider timestamps
+ Prebuffering,
+
+ // Buffer incoming data until a given timestamp is reached
+ Timestamped,
+};
+
+
/* New input object base */
class InputBase {
public:
- virtual int open(const std::string& name) = 0;
- virtual int readFrame(uint8_t* buffer, size_t size) = 0;
+ /* Throws runtime_error or invalid_argument on failure */
+ virtual void open(const std::string& name) = 0;
+
+ /* read a frame from the input. Buffer management is either not necessary
+ * (e.g. File input) or done with pre-buffering (network-based inputs).
+ *
+ * This ignores timestamps. All inputs support this.
+ *
+ * Returns number of data bytes written to the buffer. May clear the buffer
+ * if no data bytes available, in which case it will return 0.
+ *
+ * Returns negative on error.
+ */
+ virtual size_t readFrame(uint8_t *buffer, size_t size) = 0;
+
+ /* read a frame from the input, taking into account timestamp. The timestamp of the data
+ * returned is not more recent than the timestamp specified in seconds and tsta.
+ *
+ * seconds is in UNIX epoch, utco is the TAI-UTC offset, tsta is in the format used by ETI.
+ *
+ * Returns number of data bytes written to the buffer. May clear the buffer
+ * if no data bytes available, in which case it will return 0.
+ *
+ * Returns negative on error.
+ *
+ * Calling this function on inputs that do not support timestamps returns 0. This allows
+ * changing the buffer management at runtime without risking an crash due to an exception.
+ */
+ virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) = 0;
+
+ /* Returns the effectively used bitrate, or throws invalid_argument on invalid bitrate */
virtual int setBitrate(int bitrate) = 0;
- virtual int close() = 0;
+ virtual void close() = 0;
virtual ~InputBase() {}
+
+ void setTistDelay(const std::chrono::milliseconds& ms) { m_tist_delay = ms; }
+ void setBufferManagement(BufferManagement bm) { m_bufferManagement = bm; }
+ BufferManagement getBufferManagement() const { return m_bufferManagement; }
+
protected:
InputBase() {}
+
+ std::atomic<BufferManagement> m_bufferManagement = ATOMIC_VAR_INIT(BufferManagement::Prebuffering);
+ std::chrono::milliseconds m_tist_delay;
};
};
diff --git a/src/utils.cpp b/src/utils.cpp
index 721c145..7cd441a 100644
--- a/src/utils.cpp
+++ b/src/utils.cpp
@@ -25,6 +25,7 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
#include <cstring>
+#include <climits>
#include <iostream>
#include <memory>
#include <boost/algorithm/string/join.hpp>
@@ -328,7 +329,7 @@ void printSubchannels(const vec_sp_subchannel& subchannels)
etiLog.level(info) << " URI: " << subchannel->inputUri;
switch (subchannel->type) {
case subchannel_type_t::DABAudio:
- etiLog.log(info, " type: DAbAudio");
+ etiLog.log(info, " type: DABAudio");
break;
case subchannel_type_t::DABPlusAudio:
etiLog.log(info, " type: DABPlusAudio");
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
index 2128abf..2188f8a 100644
--- a/src/zmq2edi/EDISender.cpp
+++ b/src/zmq2edi/EDISender.cpp
@@ -79,7 +79,7 @@ void EDISender::print_configuration()
void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
{
edi::TagDETI edi_tagDETI;
- edi::TagStarPTR edi_tagStarPtr;
+ edi::TagStarPTR edi_tagStarPtr("DETI");
map<int, edi::TagESTn> edi_subchannelToTag;
// The above Tag Items will be assembled into a TAG Packet
edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
index bb9c8bc..3525b4b 100644
--- a/src/zmq2edi/EDISender.h
+++ b/src/zmq2edi/EDISender.h
@@ -34,9 +34,9 @@
#include <atomic>
#include "ThreadsafeQueue.h"
#include "dabOutput/dabOutput.h"
-#include "dabOutput/edi/TagItems.h"
-#include "dabOutput/edi/TagPacket.h"
-#include "dabOutput/edi/Transport.h"
+#include "edioutput/TagItems.h"
+#include "edioutput/TagPacket.h"
+#include "edioutput/Transport.h"
// This metadata gets transmitted in the zmq stream
struct metadata_t {
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 3888d8a..f7d733c 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -27,11 +27,13 @@
#include "Log.h"
#include "zmq.hpp"
-#include <math.h>
#include <getopt.h>
-#include <string.h>
+#include <cmath>
+#include <cstring>
+#include <chrono>
#include <iostream>
#include <iterator>
+#include <thread>
#include <vector>
#include "EDISender.h"
@@ -39,12 +41,13 @@
constexpr size_t MAX_ERROR_COUNT = 10;
constexpr long ZMQ_TIMEOUT_MS = 1000;
+constexpr long DEFAULT_BACKOFF = 5000;
static edi::configuration_t edi_conf;
static EDISender edisender;
-void usage(void)
+static void usage()
{
using namespace std;
@@ -54,23 +57,25 @@ void usage(void)
cerr << "Options:" << endl;
cerr << "The following options can be given only once:" << endl;
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
- cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
- cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
- cerr << " -p <destination port> sets the destination port." << endl;
- cerr << " -P Disable PFT and send AFPackets." << endl;
- cerr << " -f <fec> sets the FEC." << endl;
- cerr << " -i <interleave> enables the interleaved with this latency." << endl;
- cerr << " -D dumps the EDI to edi.debug file." << endl;
- cerr << " -v Enables verbose mode." << endl;
- cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl << endl;
+ cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
+ cerr << " Negative delay values are also allowed." << endl;
+ cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
+ cerr << " -p <destination port> Set the destination port." << endl;
+ cerr << " -P Disable PFT and send AFPackets." << endl;
+ cerr << " -f <fec> Set the FEC." << endl;
+ cerr << " -i <interleave> Enable the interleaver with this latency." << endl;
+ cerr << " -D Dump the EDI to edi.debug file." << endl;
+ cerr << " -v Enables verbose mode." << endl;
+ cerr << " -a <alignement> Set the alignment of the TAG Packet (default 8)." << endl;
+ cerr << " -b <backoff> Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl;
cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl;
- cerr << " -d <destination ip> sets the destination ip." << endl;
- cerr << " -s <source port> sets the source port." << endl;
- cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl;
- cerr << " -t <ttl> set the packet's TTL." << endl << endl;
+ cerr << " -d <destination ip> Set the destination ip." << endl;
+ cerr << " -s <source port> Set the source port." << endl;
+ cerr << " -S <source ip> Select the source IP in case we want to use multicast." << endl;
+ cerr << " -t <ttl> Set the packet's TTL." << endl << endl;
- cerr << "odr-zmq2edi will quit if it does not receive data for " <<
+ cerr << "The input socket will be reset if no data is received for " <<
(int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
}
@@ -228,10 +233,11 @@ int start(int argc, char **argv)
int delay_ms = 500;
bool drop_late_packets = false;
+ uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;
int ch = 0;
while (ch != -1) {
- ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:x");
+ ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:b:w:xh");
switch (ch) {
case -1:
break;
@@ -276,6 +282,9 @@ int start(int argc, char **argv)
case 'a':
edi_conf.tagpacket_alignment = std::stoi(optarg);
break;
+ case 'b':
+ backoff_after_reset_ms = std::stoi(optarg);
+ break;
case 'w':
delay_ms = std::stoi(optarg);
break;
@@ -313,85 +322,92 @@ int start(int argc, char **argv)
const char* source_url = argv[optind];
-
- size_t frame_count = 0;
- size_t error_count = 0;
-
- etiLog.level(info) << "Opening ZMQ input: " << source_url;
-
zmq::context_t zmq_ctx(1);
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
-
- while (error_count < MAX_ERROR_COUNT) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
-
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+ etiLog.level(info) << "Opening ZMQ input: " << source_url;
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ size_t num_consecutive_resets = 0;
+ while (true) {
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ size_t error_count = 0;
+
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
error_count++;
}
+ else {
+ num_consecutive_resets = 0;
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
- std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg->buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
error_count++;
}
- else {
- std::vector<uint8_t> buf(6144, 0x55);
- const int framesize = dab_msg->buflen[i];
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+
+ std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
+ error_count++;
+ }
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
+
+ const int framesize = dab_msg->buflen[i];
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- all_frames.emplace_back(
- std::piecewise_construct,
- std::make_tuple(std::move(buf)),
- std::make_tuple());
+ all_frames.emplace_back(
+ std::piecewise_construct,
+ std::make_tuple(std::move(buf)),
+ std::make_tuple());
- offset += framesize;
+ offset += framesize;
+ }
}
- }
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
- f.second = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
+ f.second = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
- offset += consumed_bytes;
- }
+ offset += consumed_bytes;
+ }
- for (auto &f : all_frames) {
- edisender.push_frame(f);
- frame_count++;
+ for (auto &f : all_frames) {
+ edisender.push_frame(f);
+ }
}
}
- }
- etiLog.level(info) << "Quitting after " << frame_count << " frames transferred";
+ num_consecutive_resets++;
+
+ zmq_sock.close();
+ std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms));
+ etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " <<
+ num_consecutive_resets << " consecutive resets.";
+ }
return 0;
}
diff --git a/src/zmq2farsync/zmq2farsync.cpp b/src/zmq2farsync/zmq2farsync.cpp
index 16830a2..95dc074 100644
--- a/src/zmq2farsync/zmq2farsync.cpp
+++ b/src/zmq2farsync/zmq2farsync.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -28,13 +28,16 @@
#include "dabOutput/dabOutput.h"
#include "Log.h"
#include "zmq.hpp"
+#include <chrono>
#include <iostream>
+#include <thread>
#include <vector>
constexpr size_t MAX_ERROR_COUNT = 10;
+constexpr size_t MAX_NUM_RESETS = 180;
constexpr long ZMQ_TIMEOUT_MS = 1000;
-void usage(void)
+static void usage()
{
using namespace std;
@@ -46,8 +49,9 @@ void usage(void)
cerr << " <destination> is the device information for the FarSync card." << endl << endl;
cerr << " The syntax is the same as for ODR-DabMux" << endl << endl;
- cerr << "odr-zmq2edi will quit if it does not receive data for " <<
+ cerr << "The input socket will be reset if no data is received for " <<
(int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
+ cerr << "After " << MAX_NUM_RESETS << " consecutive resets, the process will quit." << endl;
cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
}
@@ -80,72 +84,81 @@ int main(int argc, char **argv)
etiLog.level(info) << "Opening ZMQ input: " << source_url;
zmq::context_t zmq_ctx(1);
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
-
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
-
- etiLog.level(info) << "Entering main loop";
size_t frame_count = 0;
size_t loop_counter = 0;
- size_t error_count = 0;
- while (error_count < MAX_ERROR_COUNT)
- {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
-
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ size_t num_consecutive_resets = 0;
+ while (num_consecutive_resets < MAX_NUM_RESETS) {
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ size_t error_count = 0;
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
error_count++;
}
+ else {
+ num_consecutive_resets = 0;
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
- dab_msg->buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
error_count++;
}
- else {
- std::vector<uint8_t> buf(6144, 0x55);
- const int framesize = dab_msg->buflen[i];
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
-
- offset += framesize;
-
- if (output.Write(&buf.front(), buf.size()) == -1) {
- etiLog.level(error) << "Cannot write to output!";
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
+ dab_msg->buflen[i];
error_count++;
}
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
+
+ const int framesize = dab_msg->buflen[i];
+
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- frame_count++;
+ offset += framesize;
+
+ if (output.Write(&buf.front(), buf.size()) == -1) {
+ etiLog.level(error) << "Cannot write to output!";
+ error_count++;
+ }
+
+ frame_count++;
+ }
}
- }
- loop_counter++;
- if (loop_counter > 250) {
- etiLog.level(info) << "Transmitted " << frame_count << " ETI frames";
- loop_counter = 0;
+ loop_counter++;
+ if (loop_counter > 250) {
+ etiLog.level(info) << "Transmitted " << frame_count << " ETI frames";
+ loop_counter = 0;
+ }
}
}
+
+ num_consecutive_resets++;
+
+ zmq_sock.close();
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " <<
+ num_consecutive_resets << " consecutive resets.";
}
- return error_count > 0 ? 2 : 0;
+ return 0;
}