From f75ac25e6e93ab167b5f3cfdecbbbf286fdc4fed Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 24 Jan 2014 14:11:19 +0100 Subject: Implement remote control and global_stats, and add to configuration --- src/DabMux.cpp | 15 +++++++++++++-- src/ParserConfigfile.cpp | 38 ++++++++++++++++++++++++++++++++------ src/ParserConfigfile.h | 8 +++++--- src/RemoteControl.h | 25 ++++++++++++++----------- src/StatsServer.h | 4 ++++ src/dabInputZmq.cpp | 12 +++++------- 6 files changed, 73 insertions(+), 29 deletions(-) diff --git a/src/DabMux.cpp b/src/DabMux.cpp index d1198f1..4e10287 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -123,7 +123,7 @@ typedef DWORD32 uint32_t; using namespace std; /* Global stats server */ -StatsServer global_stats(12720); //TODO define port +StatsServer* global_stats; static unsigned char Padding_FIB[] = { 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, @@ -288,6 +288,8 @@ int main(int argc, char *argv[]) vector::iterator output; dabProtection* protection = NULL; + BaseRemoteController* rc; + unsigned int currentFrame; int returnCode = 0; int result; @@ -312,6 +314,8 @@ int main(int argc, char *argv[]) bool enableTist = false; unsigned timestamp = 0; + int statsserverport = 0; + unsigned long time_seconds = 0; struct timeval mnsc_time; @@ -337,7 +341,7 @@ int main(int argc, char *argv[]) string conf_file = argv[2]; parse_configfile(conf_file, outputs, ensemble, &enableTist, &FICL, - &factumAnalyzer, &limit); + &factumAnalyzer, &limit, rc, &statsserverport); printSubchannels(ensemble->subchannels); cerr << endl; @@ -361,6 +365,13 @@ int main(int argc, char *argv[]) } } + if (statsserverport != 0) { + global_stats = new StatsServer(statsserverport); + } + else { + global_stats = new StatsServer(); + } + etiLog.log(info, "CRC-DABMUX starting up"); diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp index e513fa3..df83df7 100644 --- a/src/ParserConfigfile.cpp +++ b/src/ParserConfigfile.cpp @@ -63,6 +63,7 @@ #include "dabInputDmbUdp.h" #include "dabInputZmq.h" #include "DabMux.h" +#include "StatsServer.h" #ifdef _WIN32 @@ -144,7 +145,9 @@ void parse_configfile(string configuration_file, bool* enableTist, unsigned* FICL, bool* factumAnalyzer, - unsigned long* limit + unsigned long* limit, + BaseRemoteController* rc, + int* statsServerPort ) { using boost::property_tree::ptree; @@ -183,6 +186,22 @@ void parse_configfile(string configuration_file, *enableTist = pt_general.get("tist", false); + *statsServerPort = pt_general.get("statsserverport", 0); + + /************** READ REMOTE CONTROL PARAMETERS *************/ + ptree pt_rc = pt.get_child("remotecontrol"); + int telnetport = 0; + if (pt_rc.get("telnet", 0)) { + telnetport = pt_rc.get("port", 0); + } + + if (telnetport != 0) { + rc = new RemoteControllerTelnet(telnetport); + } + else { + rc = new RemoteControllerDummy(); + } + /******************** READ ENSEMBLE PARAMETERS *************/ ptree pt_ensemble = pt.get_child("ensemble"); @@ -271,7 +290,7 @@ void parse_configfile(string configuration_file, ensemble->subchannels.push_back(subchan); try { - setup_subchannel_from_ptree(subchan, it->second, ensemble, subchanuid); + setup_subchannel_from_ptree(subchan, it->second, ensemble, subchanuid, rc); } catch (runtime_error &e) { etiLog.log(error, @@ -409,7 +428,8 @@ void parse_configfile(string configuration_file, void setup_subchannel_from_ptree(dabSubchannel* subchan, boost::property_tree::ptree &pt, dabEnsemble* ensemble, - string subchanuid) + string subchanuid, + BaseRemoteController* rc) { using boost::property_tree::ptree; using boost::property_tree::ptree_error; @@ -489,19 +509,25 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, } else if (strcmp(subchan->inputProto, "tcp") == 0) { input_is_old_style = false; - subchan->input = new DabInputZmq(subchanuid); + DabInputZmq* inzmq = new DabInputZmq(subchanuid); + inzmq->enrol_at(*rc); + subchan->input = inzmq; subchan->inputName = full_inputName; } else if (strcmp(subchan->inputProto, "epmg") == 0) { etiLog.level(warn) << "Using untested epmg:// zeromq input"; input_is_old_style = false; - subchan->input = new DabInputZmq(subchanuid); + DabInputZmq* inzmq = new DabInputZmq(subchanuid); + inzmq->enrol_at(*rc); + subchan->input = inzmq; subchan->inputName = full_inputName; } else if (strcmp(subchan->inputProto, "ipc") == 0) { etiLog.level(warn) << "Using untested ipc:// zeromq input"; input_is_old_style = false; - subchan->input = new DabInputZmq(subchanuid); + DabInputZmq* inzmq = new DabInputZmq(subchanuid); + inzmq->enrol_at(*rc); + subchan->input = inzmq; subchan->inputName = full_inputName; #endif // defined(HAVE_INPUT_ZEROMQ) } else { diff --git a/src/ParserConfigfile.h b/src/ParserConfigfile.h index a09d1fc..f91a545 100644 --- a/src/ParserConfigfile.h +++ b/src/ParserConfigfile.h @@ -41,13 +41,15 @@ void parse_configfile(std::string configuration_file, bool* enableTist, unsigned* FICL, bool* factumAnalyzer, - unsigned long* limit - ); + unsigned long* limit, + BaseRemoteController* rc, + int* statsServerPort); void setup_subchannel_from_ptree(dabSubchannel* subchan, boost::property_tree::ptree &pt, dabEnsemble* ensemble, - std::string uid); + string subchanuid, + BaseRemoteController* rc); #endif diff --git a/src/RemoteControl.h b/src/RemoteControl.h index 77dbff4..7dcda0a 100644 --- a/src/RemoteControl.h +++ b/src/RemoteControl.h @@ -123,20 +123,20 @@ class RemoteControllable { */ class RemoteControllerTelnet : public BaseRemoteController { public: - RemoteControllerTelnet(int port) { - m_port = port; - m_running = false; - }; + RemoteControllerTelnet() + : m_running(false), m_port(0) {} - void start() { - m_running = true; - m_child_thread = boost::thread(&RemoteControllerTelnet::process, this, 0); - } + RemoteControllerTelnet(int port) + : m_running(true), m_port(port), + m_child_thread(&RemoteControllerTelnet::process, this, 0) + {} - void stop() { + ~RemoteControllerTelnet() { m_running = false; - m_child_thread.interrupt(); - m_child_thread.join(); + if (m_port) { + m_child_thread.interrupt(); + m_child_thread.join(); + } } void process(long); @@ -151,6 +151,9 @@ class RemoteControllerTelnet : public BaseRemoteController { private: + RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other); + RemoteControllerTelnet(const RemoteControllerTelnet& other); + vector tokenise_(string message) { vector all_tokens; diff --git a/src/StatsServer.h b/src/StatsServer.h index 5bbf327..8462e6a 100644 --- a/src/StatsServer.h +++ b/src/StatsServer.h @@ -80,6 +80,7 @@ struct InputStat class StatsServer { public: + StatsServer() : m_listenport(0), m_running(false) {} StatsServer(int listen_port) : m_listenport(listen_port), m_running(true), @@ -139,5 +140,8 @@ class StatsServer mutable boost::mutex m_mutex; }; + +extern StatsServer* global_stats; + #endif diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 4a2114a..10c0f98 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -51,8 +51,6 @@ #ifdef HAVE_INPUT_ZEROMQ -extern StatsServer global_stats; - int DabInputZmq::open(const std::string inputUri) { // Prepare the ZMQ socket to accept connections @@ -75,7 +73,7 @@ int DabInputZmq::open(const std::string inputUri) } // We want to appear in the statistics ! - global_stats.registerInput(m_name); + global_stats->registerInput(m_name); return 0; } @@ -94,7 +92,7 @@ int DabInputZmq::readFrame(void* buffer, int size) /* Notify of a buffer overrun, and drop some frames */ if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { - global_stats.notifyOverrun(m_name); + global_stats->notifyOverrun(m_name); /* If the buffer is really too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer @@ -136,13 +134,13 @@ int DabInputZmq::readFrame(void* buffer, int size) m_name.c_str()); /* During prebuffering, give a zeroed frame to the mux */ - global_stats.notifyUnderrun(m_name); + global_stats->notifyUnderrun(m_name); memset(buffer, 0, size); return size; } // Save stats data in bytes, not in frames - global_stats.notifyBuffer(m_name, m_frame_buffer.size() * size); + global_stats->notifyBuffer(m_name, m_frame_buffer.size() * size); if (m_frame_buffer.empty()) { etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", @@ -151,7 +149,7 @@ int DabInputZmq::readFrame(void* buffer, int size) m_prebuffering = INPUT_ZMQ_PREBUFFERING; /* We have no data to give, we give a zeroed frame */ - global_stats.notifyUnderrun(m_name); + global_stats->notifyUnderrun(m_name); memset(buffer, 0, size); return size; } -- cgit v1.2.3