diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | README | 1 | ||||
-rw-r--r-- | configure.ac | 3 | ||||
-rw-r--r-- | doc/example.mux | 17 | ||||
-rw-r--r-- | src/DabMux.cpp | 41 | ||||
-rw-r--r-- | src/Log.cpp | 2 | ||||
-rw-r--r-- | src/Log.h | 2 | ||||
-rw-r--r-- | src/Makefile.am | 1 | ||||
-rw-r--r-- | src/MuxElements.h | 3 | ||||
-rw-r--r-- | src/ParserCmdline.cpp | 57 | ||||
-rw-r--r-- | src/ParserConfigfile.cpp | 120 | ||||
-rw-r--r-- | src/ParserConfigfile.h | 8 | ||||
-rw-r--r-- | src/RemoteControl.cpp | 226 | ||||
-rw-r--r-- | src/RemoteControl.h | 233 | ||||
-rw-r--r-- | src/StatsServer.h | 4 | ||||
-rw-r--r-- | src/UdpSocket.h | 2 | ||||
-rw-r--r-- | src/dabInput.h | 66 | ||||
-rw-r--r-- | src/dabInputZmq.cpp | 220 | ||||
-rw-r--r-- | src/dabInputZmq.h | 54 | ||||
-rw-r--r-- | src/dabOutput/dabOutput.h | 23 | ||||
-rw-r--r-- | src/dabOutput/dabOutputUdp.cpp | 3 |
21 files changed, 840 insertions, 247 deletions
@@ -28,3 +28,4 @@ cscope.out ctags tags .clang_complete +.ycm_extra_conf.py* @@ -36,6 +36,7 @@ to the official one: - a ZeroMQ dabplus input that can be used with fdk-aac-dabplus-zmq - supports logging to syslog - supports ZMQ input monitoring with munin tool +- supports a Telnet Remote Control for setting/getting parameters The src/ directory contains the source code of CRC-DabMux. diff --git a/configure.ac b/configure.ac index c552337..3ae8505 100644 --- a/configure.ac +++ b/configure.ac @@ -171,8 +171,7 @@ AS_IF([test "x$enable_output_tcp" = "xyes"], [AC_DEFINE(HAVE_OUTPUT_TCP, [1], [Define if TCP output is enabled])]) # RAW AC_ARG_ENABLE([output_raw], - [AS_HELP_STRING([--disable-output-raw], [Disable RAW output])], - [], [enable_output_raw=yes]) + AS_HELP_STRING([--enable-output-raw], [Enable RAW output])) AS_IF([test "x$enable_output_raw" = "xyes"], [AC_DEFINE(HAVE_OUTPUT_RAW, [1], [Define if RAW output is enabled])]) # SIMUL diff --git a/doc/example.mux b/doc/example.mux index ac6842e..eed3c11 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -32,6 +32,23 @@ general { ; Enable timestamp definition necessary for SFN ; This also enables time encoding using the MNSC. tist false + + ; The statsserver is a simple TCP server that can present + ; statistics data (buffers, overruns, underruns, etc) + ; which can then be graphed a tool like Munin + ; The doc/stats_dabmux_multi.py tool is a suitable + ; plugin for that. + ; If the port is zero, or the line commented, the server + ; is not started. + statsserverport 12720 +} + +remotecontrol { + ; enable the telnet remote control server on the given port + ; This server allows you to read and define parameters that + ; some features export + ; Set the port to 0 to disable the server + telnetport 12721 } ; Some ensemble parameters diff --git a/src/DabMux.cpp b/src/DabMux.cpp index f3edd39..4e10287 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -74,8 +74,7 @@ typedef DWORD32 uint32_t; # include <sys/times.h> # include <sys/resource.h> -# include <linux/if_packet.h> -# include <linux/netdevice.h> +# include <net/if_packet.h> #endif #ifdef _WIN32 @@ -119,11 +118,12 @@ typedef DWORD32 uint32_t; #include "ParserConfigfile.h" #include "StatsServer.h" #include "Log.h" +#include "RemoteControl.h" using namespace std; /* Global stats server */ -StatsServer global_stats(12720); //TODO define port +StatsServer* global_stats; static unsigned char Padding_FIB[] = { 0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, @@ -288,6 +288,8 @@ int main(int argc, char *argv[]) vector<dabOutput*>::iterator output; dabProtection* protection = NULL; + BaseRemoteController* rc; + unsigned int currentFrame; int returnCode = 0; int result; @@ -312,6 +314,8 @@ int main(int argc, char *argv[]) bool enableTist = false; unsigned timestamp = 0; + int statsserverport = 0; + unsigned long time_seconds = 0; struct timeval mnsc_time; @@ -337,7 +341,7 @@ int main(int argc, char *argv[]) string conf_file = argv[2]; parse_configfile(conf_file, outputs, ensemble, &enableTist, &FICL, - &factumAnalyzer, &limit); + &factumAnalyzer, &limit, rc, &statsserverport); printSubchannels(ensemble->subchannels); cerr << endl; @@ -361,6 +365,13 @@ int main(int argc, char *argv[]) } } + if (statsserverport != 0) { + global_stats = new StatsServer(statsserverport); + } + else { + global_stats = new StatsServer(); + } + etiLog.log(info, "CRC-DABMUX starting up"); @@ -531,17 +542,14 @@ int main(int argc, char *argv[]) (*subchannel)->startAddress = (*(subchannel - 1))->startAddress + getSizeCu(*(subchannel - 1)); } - if ((*subchannel)->operations.open((*subchannel)->data, - (*subchannel)->inputName) == -1) { + if ((*subchannel)->input->open((*subchannel)->inputName) == -1) { perror((*subchannel)->inputName); returnCode = -1; goto EXIT; } // TODO Check errors - int result = (*subchannel)->operations.setBitrate( - &(*subchannel)->operations, (*subchannel)->data, - (*subchannel)->bitrate); + result = (*subchannel)->input->setBitrate( (*subchannel)->bitrate); if (result <= 0) { etiLog.log(error, "can't set bitrate for source %s\n", (*subchannel)->inputName); @@ -1718,15 +1726,8 @@ int main(int argc, char *argv[]) subchannel != ensemble->subchannels.end(); ++subchannel) { int sizeSubchannel = getSizeByte(*subchannel); - if ((*subchannel)->operations.lock != NULL) { - (*subchannel)->operations.lock((*subchannel)->data); - } - result = (*subchannel)->operations.readFrame( - &(*subchannel)->operations, - (*subchannel)->data, &etiFrame[index], sizeSubchannel); - if ((*subchannel)->operations.unlock != NULL) { - (*subchannel)->operations.unlock((*subchannel)->data); - } + result = (*subchannel)->input->readFrame( + &etiFrame[index], sizeSubchannel); if (result < 0) { etiLog.log(info, "Subchannel %d read failed at ETI frame number: %i\n", (*subchannel)->id, currentFrame); } @@ -1823,8 +1824,8 @@ EXIT: for (subchannel = ensemble->subchannels.begin(); subchannel != ensemble->subchannels.end(); ++subchannel) { - (*subchannel)->operations.close((*subchannel)->data); - (*subchannel)->operations.clean(&(*subchannel)->data); + (*subchannel)->input->close(); + delete (*subchannel)->input; } for (output = outputs.begin() ; output != outputs.end(); ++output) { if ((*output)->output) { diff --git a/src/Log.cpp b/src/Log.cpp index 30cc1f7..e428a0b 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -70,7 +70,7 @@ void Logger::logstr(log_level_t level, std::string message) for (std::list<LogBackend*>::iterator it = backends.begin(); it != backends.end(); - it++) { + ++it) { (*it)->log(level, message); } @@ -135,7 +135,7 @@ class Logger { /* Log the message to all backends */ void log(log_level_t level, const char* fmt, ...); - void logstr(log_level_t level, const std::string message); + void logstr(log_level_t level, std::string message); /* Return a LogLine for the given level * so that you can write etiLog.level(info) << "stuff = " << 21 */ diff --git a/src/Makefile.am b/src/Makefile.am index 910d5bc..3bcf54e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -71,6 +71,7 @@ crc_dabmux_SOURCES =DabMux.cpp \ bridge.h bridge.c \ utils.cpp utils.h \ MuxElements.cpp MuxElements.h \ + RemoteControl.cpp RemoteControl.h \ ParserCmdline.cpp ParserCmdline.h \ ParserConfigfile.cpp ParserConfigfile.h \ Eti.h Eti.cpp \ diff --git a/src/MuxElements.h b/src/MuxElements.h index 539d955..5556c0e 100644 --- a/src/MuxElements.h +++ b/src/MuxElements.h @@ -96,8 +96,7 @@ struct dabProtection { struct dabSubchannel { const char* inputProto; const char* inputName; - void* data; - dabInputOperations operations; + DabInputBase* input; unsigned char id; unsigned char type; uint16_t startAddress; diff --git a/src/ParserCmdline.cpp b/src/ParserCmdline.cpp index 44736c3..969f98e 100644 --- a/src/ParserCmdline.cpp +++ b/src/ParserCmdline.cpp @@ -107,6 +107,8 @@ bool parse_cmdline(char **argv, vector<dabService*>::iterator service = ensemble->services.end(); dabProtection* protection = NULL; + dabInputOperations operations; + int scids_temp = 0; char* progName = strrchr(argv[0], '/'); @@ -217,7 +219,7 @@ bool parse_cmdline(char **argv, (*subchannel)->inputProto = "file"; (*subchannel)->type = 0; (*subchannel)->bitrate = 0; - (*subchannel)->operations = dabInputMpegFileOperations; + operations = dabInputMpegFileOperations; #endif // defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_MPEG) #if defined(HAVE_FORMAT_DABPLUS) } else if (c == 'F') { @@ -238,7 +240,7 @@ bool parse_cmdline(char **argv, if (0) { #if defined(HAVE_INPUT_FILE) } else if (strcmp((*subchannel)->inputProto, "file") == 0) { - (*subchannel)->operations = dabInputDabplusFileOperations; + operations = dabInputDabplusFileOperations; #endif // defined(HAVE_INPUT_FILE) } else { etiLog.log(error, @@ -263,11 +265,11 @@ bool parse_cmdline(char **argv, #if defined(HAVE_FORMAT_BRIDGE) #if defined(HAVE_INPUT_UDP) } else if (strcmp((*subchannel)->inputProto, "udp") == 0) { - (*subchannel)->operations = dabInputBridgeUdpOperations; + operations = dabInputBridgeUdpOperations; #endif // defined(HAVE_INPUT_UDP) #if defined(HAVE_INPUT_SLIP) } else if (strcmp((*subchannel)->inputProto, "slip") == 0) { - (*subchannel)->operations = dabInputSlipOperations; + operations = dabInputSlipOperations; #endif // defined(HAVE_INPUT_SLIP) #endif // defined(HAVE_FORMAT_BRIDGE) } @@ -285,18 +287,18 @@ bool parse_cmdline(char **argv, if (0) { #if defined(HAVE_INPUT_UDP) } else if (strcmp((*subchannel)->inputProto, "udp") == 0) { - (*subchannel)->operations = dabInputUdpOperations; + operations = dabInputUdpOperations; #endif #if defined(HAVE_INPUT_PRBS) && defined(HAVE_FORMAT_RAW) } else if (strcmp((*subchannel)->inputProto, "prbs") == 0) { - (*subchannel)->operations = dabInputPrbsOperations; + operations = dabInputPrbsOperations; #endif #if defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_RAW) } else if (strcmp((*subchannel)->inputProto, "file") == 0) { - (*subchannel)->operations = dabInputRawFileOperations; + operations = dabInputRawFileOperations; #endif } else if (strcmp((*subchannel)->inputProto, "fifo") == 0) { - (*subchannel)->operations = dabInputRawFifoOperations; + operations = dabInputRawFifoOperations; } else { etiLog.log(error, "Invalid protocol for data input (%s)\n", @@ -312,7 +314,7 @@ bool parse_cmdline(char **argv, (*subchannel)->inputProto = "test"; (*subchannel)->type = 1; (*subchannel)->bitrate = DEFAULT_DATA_BITRATE; - (*subchannel)->operations = dabInputTestOperations; + operations = dabInputTestOperations; #endif // defined(HAVE_INPUT_TEST)) && defined(HAVE_FORMAT_RAW) #ifdef HAVE_FORMAT_PACKET } else if (c == 'P') { @@ -320,9 +322,9 @@ bool parse_cmdline(char **argv, (*subchannel)->type = 3; (*subchannel)->bitrate = DEFAULT_PACKET_BITRATE; #ifdef HAVE_INPUT_FILE - (*subchannel)->operations = dabInputPacketFileOperations; + operations = dabInputPacketFileOperations; #elif defined(HAVE_INPUT_FIFO) - (*subchannel)->operations = dabInputFifoOperations; + operations = dabInputFifoOperations; #else # pragma error("Must defined at least one packet input") #endif // defined(HAVE_INPUT_FILE) @@ -331,7 +333,7 @@ bool parse_cmdline(char **argv, (*subchannel)->inputProto = "file"; (*subchannel)->type = 3; (*subchannel)->bitrate = DEFAULT_PACKET_BITRATE; - (*subchannel)->operations = dabInputEnhancedPacketFileOperations; + operations = dabInputEnhancedPacketFileOperations; #endif // defined(HAVE_FORMAT_EPM) #endif // defined(HAVE_FORMAT_PACKET) #ifdef HAVE_FORMAT_DMB @@ -347,9 +349,9 @@ bool parse_cmdline(char **argv, *proto = 0; } if (strcmp((*subchannel)->inputProto, "udp") == 0) { - (*subchannel)->operations = dabInputDmbUdpOperations; + operations = dabInputDmbUdpOperations; } else if (strcmp((*subchannel)->inputProto, "file") == 0) { - (*subchannel)->operations = dabInputDmbFileOperations; + operations = dabInputDmbFileOperations; } else { etiLog.log(error, "Invalid protocol for DMB input (%s)\n", @@ -366,7 +368,8 @@ bool parse_cmdline(char **argv, "Service '%c' not yet coded!\n", c); goto EXIT; } - (*subchannel)->operations.init(&(*subchannel)->data); + (*subchannel)->input = new DabInputCompatible(operations); + for (int i = 0; i < 64; ++i) { // Find first free subchannel subchannel = getSubchannel(ensemble->subchannels, i); if (subchannel == ensemble->subchannels.end()) { @@ -588,36 +591,36 @@ bool parse_cmdline(char **argv, switch ((*subchannel)->type) { #ifdef HAVE_FORMAT_PACKET case 3: - (*subchannel)->operations.clean(&(*subchannel)->data); - if ((*subchannel)->operations == dabInputPacketFileOperations) { - (*subchannel)->operations = dabInputFifoOperations; + if ( ((DabInputCompatible*)(*subchannel)->input)->getOpts() == dabInputPacketFileOperations) { + operations = dabInputFifoOperations; #ifdef HAVE_FORMAT_EPM - } else if ((*subchannel)->operations == dabInputEnhancedPacketFileOperations) { - (*subchannel)->operations = dabInputEnhancedFifoOperations; + } else if ( ((DabInputCompatible*)(*subchannel)->input)->getOpts() == dabInputEnhancedPacketFileOperations) { + operations = dabInputEnhancedFifoOperations; #endif // defined(HAVE_FORMAT_EPM) } else { etiLog.log(error, "Error, wrong packet subchannel operations!\n"); goto EXIT; } - (*subchannel)->operations.init(&(*subchannel)->data); + delete (*subchannel)->input; + (*subchannel)->input = new DabInputCompatible(operations); (*subchannel)->inputProto = "fifo"; break; #endif // defined(HAVE_FORMAT_PACKET) #ifdef HAVE_FORMAT_MPEG case 0: - (*subchannel)->operations.clean(&(*subchannel)->data); - if ((*subchannel)->operations == dabInputMpegFileOperations) { - (*subchannel)->operations = dabInputMpegFifoOperations; - } else if ((*subchannel)->operations == + if ( ((DabInputCompatible*)(*subchannel)->input)->getOpts() == dabInputMpegFileOperations) { + operations = dabInputMpegFifoOperations; + } else if (((DabInputCompatible*)(*subchannel)->input)->getOpts() == dabInputDabplusFileOperations) { - (*subchannel)->operations = dabInputDabplusFifoOperations; + operations = dabInputDabplusFifoOperations; } else { etiLog.log(error, "Error, wrong audio subchannel operations!\n"); goto EXIT; } - (*subchannel)->operations.init(&(*subchannel)->data); + delete (*subchannel)->input; + (*subchannel)->input = new DabInputCompatible(operations); (*subchannel)->inputProto = "fifo"; break; #endif // defined(HAVE_FORMAT_MPEG) diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp index ca9983b..3ac6537 100644 --- a/src/ParserConfigfile.cpp +++ b/src/ParserConfigfile.cpp @@ -63,6 +63,7 @@ #include "dabInputDmbUdp.h" #include "dabInputZmq.h" #include "DabMux.h" +#include "StatsServer.h" #ifdef _WIN32 @@ -144,7 +145,9 @@ void parse_configfile(string configuration_file, bool* enableTist, unsigned* FICL, bool* factumAnalyzer, - unsigned long* limit + unsigned long* limit, + BaseRemoteController* rc, + int* statsServerPort ) { using boost::property_tree::ptree; @@ -183,6 +186,19 @@ void parse_configfile(string configuration_file, *enableTist = pt_general.get("tist", false); + *statsServerPort = pt_general.get<int>("statsserverport", 0); + + /************** READ REMOTE CONTROL PARAMETERS *************/ + ptree pt_rc = pt.get_child("remotecontrol"); + int telnetport = pt_rc.get<int>("telnetport", 0); + + if (telnetport != 0) { + rc = new RemoteControllerTelnet(telnetport); + } + else { + rc = new RemoteControllerDummy(); + } + /******************** READ ENSEMBLE PARAMETERS *************/ ptree pt_ensemble = pt.get_child("ensemble"); @@ -213,7 +229,7 @@ void parse_configfile(string configuration_file, ptree pt_services = pt.get_child("services"); for (ptree::iterator it = pt_services.begin(); - it != pt_services.end(); it++) { + it != pt_services.end(); ++it) { string serviceuid = it->first; ptree pt_service = it->second; dabService* service = new dabService(); @@ -264,14 +280,14 @@ void parse_configfile(string configuration_file, map<string, dabSubchannel*> allsubchans; ptree pt_subchans = pt.get_child("subchannels"); - for (ptree::iterator it = pt_subchans.begin(); it != pt_subchans.end(); it++) { + for (ptree::iterator it = pt_subchans.begin(); it != pt_subchans.end(); ++it) { string subchanuid = it->first; dabSubchannel* subchan = new dabSubchannel(); ensemble->subchannels.push_back(subchan); try { - setup_subchannel_from_ptree(subchan, it->second, ensemble, subchanuid); + setup_subchannel_from_ptree(subchan, it->second, ensemble, subchanuid, rc); } catch (runtime_error &e) { etiLog.log(error, @@ -294,7 +310,7 @@ void parse_configfile(string configuration_file, /******************** READ COMPONENT PARAMETERS ************/ map<string, dabComponent*> allcomponents; ptree pt_components = pt.get_child("components"); - for (ptree::iterator it = pt_components.begin(); it != pt_components.end(); it++) { + for (ptree::iterator it = pt_components.begin(); it != pt_components.end(); ++it) { string componentuid = it->first; ptree pt_comp = it->second; @@ -371,7 +387,7 @@ void parse_configfile(string configuration_file, /******************** READ OUTPUT PARAMETERS ***************/ map<string, dabOutput*> alloutputs; ptree pt_outputs = pt.get_child("outputs"); - for (ptree::iterator it = pt_outputs.begin(); it != pt_outputs.end(); it++) { + for (ptree::iterator it = pt_outputs.begin(); it != pt_outputs.end(); ++it) { string outputuid = it->first; string uri = pt_outputs.get<string>(outputuid); @@ -409,7 +425,8 @@ void parse_configfile(string configuration_file, void setup_subchannel_from_ptree(dabSubchannel* subchan, boost::property_tree::ptree &pt, dabEnsemble* ensemble, - string subchanuid) + string subchanuid, + BaseRemoteController* rc) { using boost::property_tree::ptree; using boost::property_tree::ptree_error; @@ -444,7 +461,12 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, subchan->inputName = inputName; - + /* The input is of the old_style type, + * with the struct of function pointers, + * and needs to be a DabInputCompatible + */ + bool input_is_old_style = true; + dabInputOperations operations; dabProtection* protection = &subchan->protection; if (0) { @@ -453,7 +475,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, subchan->inputProto = "file"; subchan->type = 0; subchan->bitrate = 0; - subchan->operations = dabInputMpegFileOperations; + operations = dabInputMpegFileOperations; #endif // defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_MPEG) #if defined(HAVE_FORMAT_DABPLUS) } else if (type == "dabplus") { @@ -462,6 +484,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, char* proto; + char* full_inputName = new char[256]; + full_inputName[255] = '\0'; + memcpy(full_inputName, inputName, 255); + proto = strstr(inputName, "://"); if (proto == NULL) { subchan->inputProto = "file"; @@ -474,22 +500,32 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, if (0) { #if defined(HAVE_INPUT_FILE) } else if (strcmp(subchan->inputProto, "file") == 0) { - subchan->operations = dabInputDabplusFileOperations; + operations = dabInputDabplusFileOperations; #endif // defined(HAVE_INPUT_FILE) #if defined(HAVE_INPUT_ZEROMQ) } else if (strcmp(subchan->inputProto, "tcp") == 0) { - subchan->operations = dabInputZmqOperations; + input_is_old_style = false; + DabInputZmq* inzmq = new DabInputZmq(subchanuid); + inzmq->enrol_at(*rc); + subchan->input = inzmq; + subchan->inputName = full_inputName; } else if (strcmp(subchan->inputProto, "epmg") == 0) { - etiLog.log(warn, - "Using untested epmg:// zeromq input\n"); - subchan->operations = dabInputZmqOperations; + etiLog.level(warn) << "Using untested epmg:// zeromq input"; + input_is_old_style = false; + DabInputZmq* inzmq = new DabInputZmq(subchanuid); + inzmq->enrol_at(*rc); + subchan->input = inzmq; + subchan->inputName = full_inputName; } else if (strcmp(subchan->inputProto, "ipc") == 0) { - etiLog.log(warn, - "Using untested ipc:// zeromq input\n"); - subchan->operations = dabInputZmqOperations; + etiLog.level(warn) << "Using untested ipc:// zeromq input"; + input_is_old_style = false; + DabInputZmq* inzmq = new DabInputZmq(subchanuid); + inzmq->enrol_at(*rc); + subchan->input = inzmq; + subchan->inputName = full_inputName; #endif // defined(HAVE_INPUT_ZEROMQ) } else { stringstream ss; @@ -514,11 +550,11 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, #if defined(HAVE_FORMAT_BRIDGE) #if defined(HAVE_INPUT_UDP) } else if (strcmp(subchan->inputProto, "udp") == 0) { - subchan->operations = dabInputBridgeUdpOperations; + operations = dabInputBridgeUdpOperations; #endif // defined(HAVE_INPUT_UDP) #if defined(HAVE_INPUT_SLIP) } else if (strcmp(subchan->inputProto, "slip") == 0) { - subchan->operations = dabInputSlipOperations; + operations = dabInputSlipOperations; #endif // defined(HAVE_INPUT_SLIP) #endif // defined(HAVE_FORMAT_BRIDGE) } @@ -536,18 +572,18 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, if (0) { #if defined(HAVE_INPUT_UDP) } else if (strcmp(subchan->inputProto, "udp") == 0) { - subchan->operations = dabInputUdpOperations; + operations = dabInputUdpOperations; #endif #if defined(HAVE_INPUT_PRBS) && defined(HAVE_FORMAT_RAW) } else if (strcmp(subchan->inputProto, "prbs") == 0) { - subchan->operations = dabInputPrbsOperations; + operations = dabInputPrbsOperations; #endif #if defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_RAW) } else if (strcmp(subchan->inputProto, "file") == 0) { - subchan->operations = dabInputRawFileOperations; + operations = dabInputRawFileOperations; #endif } else if (strcmp(subchan->inputProto, "fifo") == 0) { - subchan->operations = dabInputRawFifoOperations; + operations = dabInputRawFifoOperations; } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << @@ -563,7 +599,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, subchan->inputProto = "test"; subchan->type = 1; subchan->bitrate = DEFAULT_DATA_BITRATE; - subchan->operations = dabInputTestOperations; + operations = dabInputTestOperations; #endif // defined(HAVE_INPUT_TEST)) && defined(HAVE_FORMAT_RAW) #ifdef HAVE_FORMAT_PACKET } else if (type == "packet") { @@ -571,9 +607,9 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, subchan->type = 3; subchan->bitrate = DEFAULT_PACKET_BITRATE; #ifdef HAVE_INPUT_FILE - subchan->operations = dabInputPacketFileOperations; + operations = dabInputPacketFileOperations; #elif defined(HAVE_INPUT_FIFO) - subchan->operations = dabInputFifoOperations; + operations = dabInputFifoOperations; #else # pragma error("Must defined at least one packet input") #endif // defined(HAVE_INPUT_FILE) @@ -582,7 +618,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, subchan->inputProto = "file"; subchan->type = 3; subchan->bitrate = DEFAULT_PACKET_BITRATE; - subchan->operations = dabInputEnhancedPacketFileOperations; + operations = dabInputEnhancedPacketFileOperations; #endif // defined(HAVE_FORMAT_EPM) #endif // defined(HAVE_FORMAT_PACKET) #ifdef HAVE_FORMAT_DMB @@ -598,9 +634,9 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, *proto = 0; } if (strcmp(subchan->inputProto, "udp") == 0) { - subchan->operations = dabInputDmbUdpOperations; + operations = dabInputDmbUdpOperations; } else if (strcmp(subchan->inputProto, "file") == 0) { - subchan->operations = dabInputDmbFileOperations; + operations = dabInputDmbFileOperations; } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << @@ -617,7 +653,6 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, ss << "Subchannel with uid " << subchanuid << " has unknown type!"; throw runtime_error(ss.str()); } - subchan->operations.init(&subchan->data); subchan->startAddress = 0; if (type == "audio") { @@ -649,37 +684,33 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, switch (subchan->type) { #ifdef HAVE_FORMAT_PACKET case 3: - subchan->operations.clean(&subchan->data); - if (subchan->operations == dabInputPacketFileOperations) { - subchan->operations = dabInputFifoOperations; + if (operations == dabInputPacketFileOperations) { + operations = dabInputFifoOperations; #ifdef HAVE_FORMAT_EPM - } else if (subchan->operations == dabInputEnhancedPacketFileOperations) { - subchan->operations = dabInputEnhancedFifoOperations; + } else if (operations == dabInputEnhancedPacketFileOperations) { + operations = dabInputEnhancedFifoOperations; #endif // defined(HAVE_FORMAT_EPM) } else { etiLog.log(error, "Error, wrong packet subchannel operations!\n"); throw runtime_error("Error, wrong packet subchannel operations!\n"); } - subchan->operations.init(&subchan->data); subchan->inputProto = "fifo"; break; #endif // defined(HAVE_FORMAT_PACKET) #ifdef HAVE_FORMAT_MPEG case 0: - subchan->operations.clean(&subchan->data); - if (subchan->operations == dabInputMpegFileOperations) { - subchan->operations = dabInputMpegFifoOperations; - } else if (subchan->operations == + if (operations == dabInputMpegFileOperations) { + operations = dabInputMpegFifoOperations; + } else if (operations == dabInputDabplusFileOperations) { - subchan->operations = dabInputDabplusFifoOperations; + operations = dabInputDabplusFifoOperations; } else { etiLog.log(error, "Error, wrong audio subchannel operations!\n"); throw runtime_error( "Error, wrong audio subchannel operations!\n"); } - subchan->operations.init(&subchan->data); subchan->inputProto = "fifo"; break; #endif // defined(HAVE_FORMAT_MPEG) @@ -735,4 +766,9 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan, } catch (ptree_error &e) {} + /* Create object */ + if (input_is_old_style) { + subchan->input = new DabInputCompatible(operations); + } + // else { it's already been created! } } diff --git a/src/ParserConfigfile.h b/src/ParserConfigfile.h index a09d1fc..f91a545 100644 --- a/src/ParserConfigfile.h +++ b/src/ParserConfigfile.h @@ -41,13 +41,15 @@ void parse_configfile(std::string configuration_file, bool* enableTist, unsigned* FICL, bool* factumAnalyzer, - unsigned long* limit - ); + unsigned long* limit, + BaseRemoteController* rc, + int* statsServerPort); void setup_subchannel_from_ptree(dabSubchannel* subchan, boost::property_tree::ptree &pt, dabEnsemble* ensemble, - std::string uid); + string subchanuid, + BaseRemoteController* rc); #endif diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp new file mode 100644 index 0000000..7cc975a --- /dev/null +++ b/src/RemoteControl.cpp @@ -0,0 +1,226 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 + Her Majesty the Queen in Right of Canada (Communications Research + Center Canada) + + Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012 + */ +/* + This file is part of CRC-DabMux. + + CRC-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ +#include <list> +#include <string> +#include <string> +#include <boost/asio.hpp> +#include "Log.h" + +#include "RemoteControl.h" + +using boost::asio::ip::tcp; + +void +RemoteControllerTelnet::process(long) +{ + m_welcome = "CRC-DABMUX Remote Control CLI\nWrite 'help' for help.\n**********\n"; + m_prompt = "> "; + + std::string in_message; + size_t length; + + try { + boost::asio::io_service io_service; + tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), m_port)); + + while (m_running) { + in_message = ""; + + tcp::socket socket(io_service); + + acceptor.accept(socket); + + boost::system::error_code ignored_error; + + boost::asio::write(socket, boost::asio::buffer(m_welcome), + boost::asio::transfer_all(), + ignored_error); + + while (m_running && in_message != "quit") { + boost::asio::write(socket, boost::asio::buffer(m_prompt), + boost::asio::transfer_all(), + ignored_error); + + in_message = ""; + + boost::asio::streambuf buffer; + length = boost::asio::read_until( socket, buffer, "\n", ignored_error); + + std::istream str(&buffer); + std::getline(str, in_message); + + if (length == 0) { + etiLog.level(info) << "RC: Connection terminated"; + break; + } + + while (in_message.length() > 0 && + (in_message[in_message.length()-1] == '\r' || + in_message[in_message.length()-1] == '\n')) { + in_message.erase(in_message.length()-1, 1); + } + + if (in_message.length() == 0) { + continue; + } + + etiLog.level(info) << "RC: Got message '" << in_message << "'"; + + dispatch_command(socket, in_message); + } + etiLog.level(info) << "RC: Closing socket"; + socket.close(); + } + } + catch (std::exception& e) + { + etiLog.level(error) << "Remote control caught exception: " << e.what(); + } +} + +void +RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command) +{ + vector<string> cmd = tokenise_(command); + + if (cmd[0] == "help") { + reply(socket, + "The following commands are supported:\n" + " list\n" + " * Lists the modules that are loaded\n" + " list MODULE\n" + " * Lists the parameters exported by module MODULE\n" + " show MODULE\n" + " * Lists all parameters and their values from module MODULE\n" + " get MODULE PARAMETER\n" + " * Gets the value for the specified PARAMETER from module MODULE\n" + " set MODULE PARAMETER VALUE\n" + " * Sets the value for the PARAMETER ofr module MODULE\n" + " quit\n" + " * Terminate this session\n" + "\n"); + } + else if (cmd[0] == "list") { + stringstream ss; + + if (cmd.size() == 1) { + for (list<RemoteControllable*>::iterator it = m_cohort.begin(); + it != m_cohort.end(); ++it) { + ss << (*it)->get_rc_name() << " "; + } + } + else if (cmd.size() == 2) { + try { + stringstream ss; + + list< vector<string> > params = get_parameter_descriptions_(cmd[1]); + for (list< vector<string> >::iterator it = params.begin(); + it != params.end(); ++it) { + ss << (*it)[0] << " : " << (*it)[1] << endl; + } + reply(socket, ss.str()); + } + catch (ParameterError &e) { + reply(socket, e.what()); + } + } + else { + reply(socket, "Too many arguments for command 'list'"); + } + + reply(socket, ss.str()); + } + else if (cmd[0] == "show") { + if (cmd.size() == 2) { + try { + stringstream ss; + list< vector<string> > r = get_param_list_values_(cmd[1]); + for (list< vector<string> >::iterator it = r.begin(); + it != r.end(); ++it) { + ss << (*it)[0] << ": " << (*it)[1] << endl; + } + reply(socket, ss.str()); + + } + catch (ParameterError &e) { + reply(socket, e.what()); + } + } + else + { + reply(socket, "Incorrect parameters for command 'show'"); + } + } + else if (cmd[0] == "get") { + if (cmd.size() == 3) { + try { + string r = get_param_(cmd[1], cmd[2]); + reply(socket, r); + } + catch (ParameterError &e) { + reply(socket, e.what()); + } + } + else + { + reply(socket, "Incorrect parameters for command 'get'"); + } + } + else if (cmd[0] == "set") { + if (cmd.size() == 4) { + try { + set_param_(cmd[1], cmd[2], cmd[3]); + reply(socket, "ok"); + } + catch (ParameterError &e) { + reply(socket, e.what()); + } + catch (exception &e) { + reply(socket, "Error: Invalid parameter value. "); + } + } + else + { + reply(socket, "Incorrect parameters for command 'get'"); + } + } + else if (cmd[0] == "quit") { + reply(socket, "Goodbye"); + } + else { + reply(socket, "Message not understood"); + } +} + +void +RemoteControllerTelnet::reply(tcp::socket& socket, string message) +{ + boost::system::error_code ignored_error; + stringstream ss; + ss << message << "\r\n"; + boost::asio::write(socket, boost::asio::buffer(ss.str()), + boost::asio::transfer_all(), + ignored_error); +} + diff --git a/src/RemoteControl.h b/src/RemoteControl.h new file mode 100644 index 0000000..d0965e0 --- /dev/null +++ b/src/RemoteControl.h @@ -0,0 +1,233 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 + Her Majesty the Queen in Right of Canada (Communications Research + Center Canada) + + Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012 + + This module adds remote-control capability to some of the dabmod modules. + see testremotecontrol/test.cpp for an example of how to use this. + */ +/* + This file is part of CRC-DabMux. + + CRC-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef _REMOTECONTROL_H +#define _REMOTECONTROL_H + +#include <list> +#include <map> +#include <string> +#include <string> +#include <boost/bind.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <boost/asio.hpp> +#include <boost/foreach.hpp> +#include <boost/tokenizer.hpp> +#include <boost/thread.hpp> +#include <stdexcept> + + +#define RC_ADD_PARAMETER(p, desc) { \ + vector<string> p; \ + p.push_back(#p); \ + p.push_back(desc); \ + m_parameters.push_back(p); \ +} + + +using namespace std; +using boost::asio::ip::tcp; + +class ParameterError : public std::exception +{ + public: + ParameterError(string message) : m_message(message) {} + ~ParameterError() throw() {}; + const char* what() const throw() { return m_message.c_str(); } + + private: + string m_message; +}; + +class RemoteControllable; + +/* Remote controllers (that recieve orders from the user) must implement BaseRemoteController */ +class BaseRemoteController { + public: + /* Add a new controllable under this controller's command */ + virtual void enrol(RemoteControllable* controllable) = 0; +}; + +/* Objects that support remote control must implement the following class */ +class RemoteControllable { + public: + + RemoteControllable(string name) : m_name(name) {} + + /* return a short name used to identify the controllable. + * It might be used in the commands the user has to type, so keep + * it short + */ + virtual std::string get_rc_name() { return m_name; } + + /* Tell the controllable to enrol at the given controller */ + virtual void enrol_at(BaseRemoteController& controller) { + controller.enrol(this); + } + + /* Return a list of possible parameters that can be set */ + virtual list<string> get_supported_parameters() { + list<string> parameterlist; + for (list< vector<string> >::iterator it = m_parameters.begin(); + it != m_parameters.end(); ++it) { + parameterlist.push_back((*it)[0]); + } + return parameterlist; + } + + /* Return a mapping of the descriptions of all parameters */ + virtual std::list< std::vector<std::string> > get_parameter_descriptions() { + return m_parameters; + } + + /* Base function to set parameters. */ + virtual void set_parameter(string parameter, string value) = 0; + + /* Getting a parameter always returns a string. */ + virtual string get_parameter(string parameter) = 0; + + protected: + std::string m_name; + std::list< std::vector<std::string> > m_parameters; +}; + +/* Implements a Remote controller based on a simple telnet CLI + * that listens on localhost + */ +class RemoteControllerTelnet : public BaseRemoteController { + public: + RemoteControllerTelnet() + : m_running(false), m_port(0) {} + + RemoteControllerTelnet(int port) + : m_running(true), m_port(port), + m_child_thread(&RemoteControllerTelnet::process, this, 0) + {} + + ~RemoteControllerTelnet() { + m_running = false; + if (m_port) { + m_child_thread.interrupt(); + m_child_thread.join(); + } + } + + void process(long); + + void dispatch_command(tcp::socket& socket, string command); + + void reply(tcp::socket& socket, string message); + + void enrol(RemoteControllable* controllable) { + m_cohort.push_back(controllable); + } + + + private: + RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other); + RemoteControllerTelnet(const RemoteControllerTelnet& other); + + vector<string> tokenise_(string message) { + vector<string> all_tokens; + + boost::char_separator<char> sep(" "); + boost::tokenizer< boost::char_separator<char> > tokens(message, sep); + BOOST_FOREACH (const string& t, tokens) { + all_tokens.push_back(t); + } + return all_tokens; + } + + RemoteControllable* get_controllable_(string name) { + for (list<RemoteControllable*>::iterator it = m_cohort.begin(); + it != m_cohort.end(); ++it) { + if ((*it)->get_rc_name() == name) + { + return *it; + } + } + throw ParameterError("Module name unknown"); + } + + list< vector<string> > get_parameter_descriptions_(string name) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->get_parameter_descriptions(); + } + + list<string> get_param_list_(string name) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->get_supported_parameters(); + } + + list< vector<string> > get_param_list_values_(string name) { + RemoteControllable* controllable = get_controllable_(name); + + list< vector<string> > allparams; + list<string> params = controllable->get_supported_parameters(); + for (list<string>::iterator it = params.begin(); it != params.end(); ++it) { + vector<string> item; + item.push_back(*it); + item.push_back(controllable->get_parameter(*it)); + + allparams.push_back(item); + } + return allparams; + } + + string get_param_(string name, string param) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->get_parameter(param); + } + + void set_param_(string name, string param, string value) { + RemoteControllable* controllable = get_controllable_(name); + return controllable->set_parameter(param, value); + } + + bool m_running; + boost::thread m_child_thread; + + /* This controller commands the controllables in the cohort */ + list<RemoteControllable*> m_cohort; + + std::string m_welcome; + std::string m_prompt; + + int m_port; +}; + + +/* The Dummy remote controller does nothing + */ +class RemoteControllerDummy : public BaseRemoteController { + public: + void enrol(RemoteControllable* controllable) {}; +}; + +#endif + diff --git a/src/StatsServer.h b/src/StatsServer.h index 5bbf327..8462e6a 100644 --- a/src/StatsServer.h +++ b/src/StatsServer.h @@ -80,6 +80,7 @@ struct InputStat class StatsServer { public: + StatsServer() : m_listenport(0), m_running(false) {} StatsServer(int listen_port) : m_listenport(listen_port), m_running(true), @@ -139,5 +140,8 @@ class StatsServer mutable boost::mutex m_mutex; }; + +extern StatsServer* global_stats; + #endif diff --git a/src/UdpSocket.h b/src/UdpSocket.h index a500606..d96e01e 100644 --- a/src/UdpSocket.h +++ b/src/UdpSocket.h @@ -115,7 +115,7 @@ class UdpPacket { void setSize(unsigned newSize); InetAddress &getAddress(); // Not implemented - const UdpPacket &operator=(const UdpPacket&); + const UdpPacket& operator=(const UdpPacket&); private: char *dataBuf; diff --git a/src/dabInput.h b/src/dabInput.h index 685f7b2..12823b8 100644 --- a/src/dabInput.h +++ b/src/dabInput.h @@ -26,8 +26,10 @@ # include "config.h" #endif #include "Log.h" -extern Logger etiLog; +#include "RemoteControl.h" +#include <string> +extern Logger etiLog; struct dabInputOperations { int (*init)(void** args); @@ -44,6 +46,68 @@ struct dabInputOperations { bool operator==(const dabInputOperations&); }; +/* New input object base */ +class DabInputBase { + public: + virtual int open(const std::string name) = 0; + virtual int readFrame(void* buffer, int size) = 0; + virtual int setBitrate(int bitrate) = 0; + virtual int close() = 0; + + virtual ~DabInputBase() {}; + protected: + DabInputBase() {}; +}; + +/* Wrapper class for old-style dabInputOperations inputs */ +class DabInputCompatible : public DabInputBase { + public: + DabInputCompatible(dabInputOperations ops) + : m_ops(ops) + { m_ops.init(&args); } + + virtual ~DabInputCompatible() + { m_ops.clean(&args); } + + virtual int open(const std::string name) + { return m_ops.open(args, name.c_str()); } + + virtual int setbuf(int size) + { return m_ops.setbuf(args, size); } + + virtual int readFrame(void* buffer, int size) + { + if (m_ops.lock) { + m_ops.lock(args); + } + int result = m_ops.readFrame(&m_ops, args, buffer, size); + if (m_ops.unlock) { + m_ops.unlock(args); + } + return result; + } + + virtual int setBitrate(int bitrate) + { return m_ops.setBitrate(&m_ops, args, bitrate); } + + virtual int close() + { return m_ops.close(args); } + + virtual int rewind() + { return m_ops.rewind(args); } + + virtual int read(void* buffer, int size) + { return m_ops.read(args, buffer, size); } + + virtual dabInputOperations getOpts() { return m_ops; } + + private: + DabInputCompatible& operator=(const DabInputCompatible& other); + DabInputCompatible(const DabInputCompatible& other); + + dabInputOperations m_ops; + void* args; +}; int dabInputSetbuf(void* args, int size); int dabInputSetbitrate(dabInputOperations* ops, void* args, int bitrate); diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 580e82d..c84e81c 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -34,7 +34,6 @@ #include "dabInput.h" #include "dabInputZmq.h" -#include "dabInputFifo.h" #include "StatsServer.h" #ifdef HAVE_CONFIG_H @@ -44,182 +43,153 @@ #ifdef HAVE_INPUT_ZEROMQ #include <stdio.h> -#include <zmq.h> +#include <zmq.hpp> #include <list> +#include <exception> #include <string.h> #include <string> +#include <sstream> #include <limits.h> #ifdef __MINGW32__ # define bzero(s, n) memset(s, 0, n) #endif -extern StatsServer global_stats; +extern StatsServer* global_stats; -struct dabInputOperations dabInputZmqOperations = { - dabInputZmqInit, - dabInputZmqOpen, - dabInputSetbuf, - NULL, - NULL, - NULL, - dabInputZmqReadFrame, - dabInputSetbitrate, - dabInputZmqClose, - dabInputZmqClean, - NULL -}; - - -int dabInputZmqInit(void** args) +int DabInputZmq::open(const std::string inputUri) { - dabInputZmqData* input = new dabInputZmqData; - input->zmq_context = zmq_ctx_new(); - if (input->zmq_context == NULL) { - etiLog.log(error, "Failed to initialise ZeroMQ context: %s\n", zmq_strerror(errno)); - return 1; + // Prepare the ZMQ socket to accept connections + try { + m_zmq_sock.bind(inputUri.c_str()); } - - input->zmq_sock = zmq_socket(input->zmq_context, ZMQ_SUB); - if (input->zmq_sock == NULL) { - etiLog.log(error, "Failed to initialise ZeroMQ socket: %s\n", zmq_strerror(errno)); - return 1; + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ bind for input " << m_name << " failed"; + throw std::runtime_error(os.str()); } - input->prebuffering = INPUT_ZMQ_PREBUFFERING; - - *args = input; - - return 0; -} - - -int dabInputZmqOpen(void* args, const char* inputUri) -{ - dabInputZmqData* input = (dabInputZmqData*)args; - - std::string uri = "tcp://" + std::string(inputUri); - int connect_error = zmq_bind(input->zmq_sock, uri.c_str()); - - if (connect_error < 0) { - etiLog.log(error, "Failed to connect socket to uri '%s': %s\n", uri.c_str(), zmq_strerror(errno)); - return 1; + try { + m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } - - connect_error = zmq_setsockopt(input->zmq_sock, ZMQ_SUBSCRIBE, NULL, 0); - if (connect_error < 0) { - etiLog.log(error, "Failed to subscribe to zmq messages: %s\n", zmq_strerror(errno)); - return 1; + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set socket options for input " << m_name << " failed"; + throw std::runtime_error(os.str()); } - global_stats.registerInput(uri); + // We want to appear in the statistics ! + global_stats->registerInput(m_name); - input->uri = uri; return 0; } - // size corresponds to a frame size. It is constant for a given bitrate -int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size) +int DabInputZmq::readFrame(void* buffer, int size) { int rc; - dabInputZmqData* input = (dabInputZmqData*)args; /* We must *always* read data from the ZMQ socket, * to make sure that ZMQ internal buffers are emptied * quickly. It's the only way to control the buffers * of the whole path from encoder to our frame_buffer. */ - rc = dabInputZmqReadFromSocket(input, size); + rc = readFromSocket(size); /* Notify of a buffer overrun, and drop some frames */ - if (input->frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { - global_stats.notifyOverrun(input->uri); + if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) { + global_stats->notifyOverrun(m_name); /* If the buffer is really too full, we drop as many frames as needed * to get down to the prebuffering size. We would like to have our buffer * filled to the prebuffering length. */ - if (input->frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { - size_t over_max = input->frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; + if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) { + size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING; while (over_max--) { - input->frame_buffer.pop_front(); + m_frame_buffer.pop_front(); } } else { /* Our frame_buffer contains DAB logical frames. Five of these make one * AAC superframe. * - * Dropping this superframe amounts to dropping 120ms of audio. */ - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); - input->frame_buffer.pop_front(); + * Dropping this superframe amounts to dropping 120ms of audio. + * + * We're actually not sure to drop five DAB logical frames + * beloning to the same AAC superframe. It is assumed that no + * receiver will crash because of this. At least, the DAB logical frame + * vs. AAC superframe alignment is preserved. + * + * TODO: of course this assumption probably doesn't hold. Fix this ! + * */ + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); + m_frame_buffer.pop_front(); } } - if (input->prebuffering > 0) { + if (m_prebuffering > 0) { if (rc > 0) - input->prebuffering--; - if (input->prebuffering == 0) + m_prebuffering--; + if (m_prebuffering == 0) etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", - input->uri.c_str()); + m_name.c_str()); /* During prebuffering, give a zeroed frame to the mux */ - global_stats.notifyUnderrun(input->uri); + global_stats->notifyUnderrun(m_name); memset(buffer, 0, size); return size; } // Save stats data in bytes, not in frames - global_stats.notifyBuffer(input->uri, input->frame_buffer.size() * size); + global_stats->notifyBuffer(m_name, m_frame_buffer.size() * size); - if (input->frame_buffer.empty()) { + if (m_frame_buffer.empty()) { etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", - input->uri.c_str()); + m_name.c_str()); // reset prebuffering - input->prebuffering = INPUT_ZMQ_PREBUFFERING; + m_prebuffering = INPUT_ZMQ_PREBUFFERING; /* We have no data to give, we give a zeroed frame */ - global_stats.notifyUnderrun(input->uri); + global_stats->notifyUnderrun(m_name); memset(buffer, 0, size); return size; } else { /* Normal situation, give a frame from the frame_buffer */ - - char* newframe = input->frame_buffer.front(); + char* newframe = m_frame_buffer.front(); memcpy(buffer, newframe, size); delete[] newframe; - input->frame_buffer.pop_front(); + m_frame_buffer.pop_front(); return size; } } // Read a superframe from the socket, cut it into five frames, and push to list -int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) +int DabInputZmq::readFromSocket(int framesize) { int rc; - zmq_msg_t msg; - rc = zmq_msg_init(&msg); - if (rc == -1) { - etiLog.log(error, "Failed to init zmq message: %s\n", zmq_strerror(errno)); - return 0; - } + int nBytes; + zmq::message_t msg; - int nBytes = zmq_msg_recv(&msg, input->zmq_sock, ZMQ_DONTWAIT); - if (nBytes == -1) { - if (errno != EAGAIN) { - etiLog.log(error, "Failed to receive zmq message: %s\n", zmq_strerror(errno)); + try { + nBytes = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (nBytes == 0) { + return 0; } - zmq_msg_close(&msg); - return 0; + } + catch (zmq::error_t& err) + { + etiLog.level(error) << "Failed to receive from zmq socket " << + m_name << ": " << err.what(); } - char* data = (char*)zmq_msg_data(&msg); + char* data = (char*)msg.data(); /* TS 102 563, Section 6: * Audio super frames are transported in five successive DAB logical frames @@ -227,10 +197,10 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) */ if (nBytes == 5*framesize) { - if (input->frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { + if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { etiLog.level(warn) << - "inputZMQ " << input->uri << - " buffer full (" << input->frame_buffer.size() << ")," + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," " dropping incoming superframe !"; nBytes = 0; } @@ -241,40 +211,66 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize) framestart += framesize) { char* frame = new char[framesize]; memcpy(frame, framestart, framesize); - input->frame_buffer.push_back(frame); + m_frame_buffer.push_back(frame); } } } else { etiLog.level(error) << - "inputZMQ " << input->uri << - " wrong data size: recv'd" << nBytes << + "inputZMQ " << m_name << + " wrong data size: recv'd " << nBytes << ", need " << 5*framesize << "."; nBytes = 0; } - zmq_msg_close(&msg); return nBytes; } - -int dabInputZmqClose(void* args) +int DabInputZmq::close() { - dabInputZmqData* input = (dabInputZmqData*)args; - zmq_close(input->zmq_sock); + m_zmq_sock.close(); return 0; } +int DabInputZmq::setBitrate(int bitrate) +{ + m_bitrate = bitrate; + return bitrate; // TODO do a nice check here +} -int dabInputZmqClean(void** args) +/********* REMOTE CONTROL ***********/ + +void DabInputZmq::set_parameter(string parameter, string value) { - dabInputZmqData* input = (dabInputZmqData*)(*args); - zmq_ctx_term(input->zmq_context); - delete input; - return 0; + stringstream ss(value); + ss.exceptions ( stringstream::failbit | stringstream::badbit ); + + if (parameter == "buffer") { + throw ParameterError("Parameter 'ntaps' is read-only"); + } + else { + stringstream ss; + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } } +string DabInputZmq::get_parameter(string parameter) +{ + stringstream ss; + if (parameter == "buffer") { + ss << INPUT_ZMQ_MAX_BUFFER_SIZE; + } + else { + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } + return ss.str(); + +} #endif diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index fdca9e6..bb37a94 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -43,11 +43,10 @@ #ifdef HAVE_INPUT_ZEROMQ -#include <zmq.h> +#include <zmq.hpp> #include <list> #include <string> #include "dabInput.h" -#include "dabInputFifo.h" #include "StatsServer.h" /* The frame_buffer contains DAB logical frames as defined in @@ -62,27 +61,40 @@ #define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms -extern struct dabInputOperations dabInputZmqOperations; - -struct dabInputZmqData { - void* zmq_context; - void* zmq_sock; - std::list<char*> frame_buffer; //stores elements of type char[<framesize>] - int prebuffering; - std::string uri; +class DabInputZmq : public DabInputBase, public RemoteControllable { + public: + DabInputZmq(const std::string name) + : RemoteControllable(name), + m_name(name), m_zmq_context(1), + m_zmq_sock(m_zmq_context, ZMQ_SUB), + m_prebuffering(INPUT_ZMQ_PREBUFFERING), + m_bitrate(0) { + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [aac superframes]"); + } + + virtual int open(const std::string inputUri); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + /* Remote control */ + virtual void set_parameter(string parameter, string value); + + /* Getting a parameter always returns a string. */ + virtual string get_parameter(string parameter); + + private: + int readFromSocket(int framesize); + + std::string m_name; + zmq::context_t m_zmq_context; + zmq::socket_t m_zmq_sock; // handle for the zmq socket + int m_prebuffering; + std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>] + int m_bitrate; }; - -int dabInputZmqInit(void** args); -int dabInputZmqOpen(void* args, const char* inputUri); -int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size); -int dabInputZmqClose(void* args); -int dabInputZmqClean(void** args); - -// Get new message from ZeroMQ -int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize); - - #endif // HAVE_INPUT_ZMQ #endif // DAB_INPUT_ZMQ_H diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 5cb619f..bbf27e3 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -134,13 +134,16 @@ class DabOutputRaw : public DabOutput #ifndef _WIN32 isCyclades_ = other.isCyclades_; #endif - buffer_ = other.buffer_; + buffer_ = new unsigned char[6144]; + memcpy(buffer_, other.buffer_, 6144); } ~DabOutputRaw() { delete[] buffer_; } + const DabOutputRaw operator=(const DabOutputRaw& other); + int Open(const char* name); int Write(void* buffer, int size); int Close(); @@ -164,11 +167,11 @@ class DabOutputUdp : public DabOutput socket_ = new UdpSocket(); } - DabOutputUdp(const DabOutputUdp& other) - { - packet_ = other.packet_; - socket_ = other.socket_; - } + // make sure we don't copy this output around + // the UdpPacket and UdpSocket do not support + // copying either + DabOutputUdp(const DabOutputUdp& other); + DabOutputUdp operator=(const DabOutputUdp& other); ~DabOutputUdp() { delete socket_; @@ -195,12 +198,8 @@ class DabOutputTcp : public DabOutput client = NULL; } - DabOutputTcp(const DabOutputTcp& other) - { - server = other.server; - client = other.client; - thread_ = other.thread_; - } + DabOutputTcp(const DabOutputTcp& other); + DabOutputTcp operator=(const DabOutputTcp& other); ~DabOutputTcp() { diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index 459120f..607fe71 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -38,8 +38,7 @@ # include <sys/types.h> # include <sys/socket.h> # include <sys/ioctl.h> -# include <linux/if_packet.h> -# include <linux/netdevice.h> +# include <net/if_packet.h> # include <net/if_arp.h> #endif |