aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/DabMux.cpp41
-rw-r--r--src/Log.cpp2
-rw-r--r--src/Log.h2
-rw-r--r--src/Makefile.am1
-rw-r--r--src/MuxElements.h3
-rw-r--r--src/ParserCmdline.cpp57
-rw-r--r--src/ParserConfigfile.cpp120
-rw-r--r--src/ParserConfigfile.h8
-rw-r--r--src/RemoteControl.cpp226
-rw-r--r--src/RemoteControl.h233
-rw-r--r--src/StatsServer.h4
-rw-r--r--src/UdpSocket.h2
-rw-r--r--src/dabInput.h66
-rw-r--r--src/dabInputZmq.cpp220
-rw-r--r--src/dabInputZmq.h54
-rw-r--r--src/dabOutput/dabOutput.h23
-rw-r--r--src/dabOutput/dabOutputUdp.cpp3
17 files changed, 820 insertions, 245 deletions
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index f3edd39..4e10287 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -74,8 +74,7 @@ typedef DWORD32 uint32_t;
# include <sys/times.h>
# include <sys/resource.h>
-# include <linux/if_packet.h>
-# include <linux/netdevice.h>
+# include <net/if_packet.h>
#endif
#ifdef _WIN32
@@ -119,11 +118,12 @@ typedef DWORD32 uint32_t;
#include "ParserConfigfile.h"
#include "StatsServer.h"
#include "Log.h"
+#include "RemoteControl.h"
using namespace std;
/* Global stats server */
-StatsServer global_stats(12720); //TODO define port
+StatsServer* global_stats;
static unsigned char Padding_FIB[] = {
0xff, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
@@ -288,6 +288,8 @@ int main(int argc, char *argv[])
vector<dabOutput*>::iterator output;
dabProtection* protection = NULL;
+ BaseRemoteController* rc;
+
unsigned int currentFrame;
int returnCode = 0;
int result;
@@ -312,6 +314,8 @@ int main(int argc, char *argv[])
bool enableTist = false;
unsigned timestamp = 0;
+ int statsserverport = 0;
+
unsigned long time_seconds = 0;
struct timeval mnsc_time;
@@ -337,7 +341,7 @@ int main(int argc, char *argv[])
string conf_file = argv[2];
parse_configfile(conf_file, outputs, ensemble, &enableTist, &FICL,
- &factumAnalyzer, &limit);
+ &factumAnalyzer, &limit, rc, &statsserverport);
printSubchannels(ensemble->subchannels);
cerr << endl;
@@ -361,6 +365,13 @@ int main(int argc, char *argv[])
}
}
+ if (statsserverport != 0) {
+ global_stats = new StatsServer(statsserverport);
+ }
+ else {
+ global_stats = new StatsServer();
+ }
+
etiLog.log(info, "CRC-DABMUX starting up");
@@ -531,17 +542,14 @@ int main(int argc, char *argv[])
(*subchannel)->startAddress = (*(subchannel - 1))->startAddress +
getSizeCu(*(subchannel - 1));
}
- if ((*subchannel)->operations.open((*subchannel)->data,
- (*subchannel)->inputName) == -1) {
+ if ((*subchannel)->input->open((*subchannel)->inputName) == -1) {
perror((*subchannel)->inputName);
returnCode = -1;
goto EXIT;
}
// TODO Check errors
- int result = (*subchannel)->operations.setBitrate(
- &(*subchannel)->operations, (*subchannel)->data,
- (*subchannel)->bitrate);
+ result = (*subchannel)->input->setBitrate( (*subchannel)->bitrate);
if (result <= 0) {
etiLog.log(error, "can't set bitrate for source %s\n",
(*subchannel)->inputName);
@@ -1718,15 +1726,8 @@ int main(int argc, char *argv[])
subchannel != ensemble->subchannels.end();
++subchannel) {
int sizeSubchannel = getSizeByte(*subchannel);
- if ((*subchannel)->operations.lock != NULL) {
- (*subchannel)->operations.lock((*subchannel)->data);
- }
- result = (*subchannel)->operations.readFrame(
- &(*subchannel)->operations,
- (*subchannel)->data, &etiFrame[index], sizeSubchannel);
- if ((*subchannel)->operations.unlock != NULL) {
- (*subchannel)->operations.unlock((*subchannel)->data);
- }
+ result = (*subchannel)->input->readFrame(
+ &etiFrame[index], sizeSubchannel);
if (result < 0) {
etiLog.log(info, "Subchannel %d read failed at ETI frame number: %i\n", (*subchannel)->id, currentFrame);
}
@@ -1823,8 +1824,8 @@ EXIT:
for (subchannel = ensemble->subchannels.begin();
subchannel != ensemble->subchannels.end();
++subchannel) {
- (*subchannel)->operations.close((*subchannel)->data);
- (*subchannel)->operations.clean(&(*subchannel)->data);
+ (*subchannel)->input->close();
+ delete (*subchannel)->input;
}
for (output = outputs.begin() ; output != outputs.end(); ++output) {
if ((*output)->output) {
diff --git a/src/Log.cpp b/src/Log.cpp
index 30cc1f7..e428a0b 100644
--- a/src/Log.cpp
+++ b/src/Log.cpp
@@ -70,7 +70,7 @@ void Logger::logstr(log_level_t level, std::string message)
for (std::list<LogBackend*>::iterator it = backends.begin();
it != backends.end();
- it++) {
+ ++it) {
(*it)->log(level, message);
}
diff --git a/src/Log.h b/src/Log.h
index 5c8c4b3..4f911dc 100644
--- a/src/Log.h
+++ b/src/Log.h
@@ -135,7 +135,7 @@ class Logger {
/* Log the message to all backends */
void log(log_level_t level, const char* fmt, ...);
- void logstr(log_level_t level, const std::string message);
+ void logstr(log_level_t level, std::string message);
/* Return a LogLine for the given level
* so that you can write etiLog.level(info) << "stuff = " << 21 */
diff --git a/src/Makefile.am b/src/Makefile.am
index 910d5bc..3bcf54e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -71,6 +71,7 @@ crc_dabmux_SOURCES =DabMux.cpp \
bridge.h bridge.c \
utils.cpp utils.h \
MuxElements.cpp MuxElements.h \
+ RemoteControl.cpp RemoteControl.h \
ParserCmdline.cpp ParserCmdline.h \
ParserConfigfile.cpp ParserConfigfile.h \
Eti.h Eti.cpp \
diff --git a/src/MuxElements.h b/src/MuxElements.h
index 539d955..5556c0e 100644
--- a/src/MuxElements.h
+++ b/src/MuxElements.h
@@ -96,8 +96,7 @@ struct dabProtection {
struct dabSubchannel {
const char* inputProto;
const char* inputName;
- void* data;
- dabInputOperations operations;
+ DabInputBase* input;
unsigned char id;
unsigned char type;
uint16_t startAddress;
diff --git a/src/ParserCmdline.cpp b/src/ParserCmdline.cpp
index 44736c3..969f98e 100644
--- a/src/ParserCmdline.cpp
+++ b/src/ParserCmdline.cpp
@@ -107,6 +107,8 @@ bool parse_cmdline(char **argv,
vector<dabService*>::iterator service = ensemble->services.end();
dabProtection* protection = NULL;
+ dabInputOperations operations;
+
int scids_temp = 0;
char* progName = strrchr(argv[0], '/');
@@ -217,7 +219,7 @@ bool parse_cmdline(char **argv,
(*subchannel)->inputProto = "file";
(*subchannel)->type = 0;
(*subchannel)->bitrate = 0;
- (*subchannel)->operations = dabInputMpegFileOperations;
+ operations = dabInputMpegFileOperations;
#endif // defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_MPEG)
#if defined(HAVE_FORMAT_DABPLUS)
} else if (c == 'F') {
@@ -238,7 +240,7 @@ bool parse_cmdline(char **argv,
if (0) {
#if defined(HAVE_INPUT_FILE)
} else if (strcmp((*subchannel)->inputProto, "file") == 0) {
- (*subchannel)->operations = dabInputDabplusFileOperations;
+ operations = dabInputDabplusFileOperations;
#endif // defined(HAVE_INPUT_FILE)
} else {
etiLog.log(error,
@@ -263,11 +265,11 @@ bool parse_cmdline(char **argv,
#if defined(HAVE_FORMAT_BRIDGE)
#if defined(HAVE_INPUT_UDP)
} else if (strcmp((*subchannel)->inputProto, "udp") == 0) {
- (*subchannel)->operations = dabInputBridgeUdpOperations;
+ operations = dabInputBridgeUdpOperations;
#endif // defined(HAVE_INPUT_UDP)
#if defined(HAVE_INPUT_SLIP)
} else if (strcmp((*subchannel)->inputProto, "slip") == 0) {
- (*subchannel)->operations = dabInputSlipOperations;
+ operations = dabInputSlipOperations;
#endif // defined(HAVE_INPUT_SLIP)
#endif // defined(HAVE_FORMAT_BRIDGE)
}
@@ -285,18 +287,18 @@ bool parse_cmdline(char **argv,
if (0) {
#if defined(HAVE_INPUT_UDP)
} else if (strcmp((*subchannel)->inputProto, "udp") == 0) {
- (*subchannel)->operations = dabInputUdpOperations;
+ operations = dabInputUdpOperations;
#endif
#if defined(HAVE_INPUT_PRBS) && defined(HAVE_FORMAT_RAW)
} else if (strcmp((*subchannel)->inputProto, "prbs") == 0) {
- (*subchannel)->operations = dabInputPrbsOperations;
+ operations = dabInputPrbsOperations;
#endif
#if defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_RAW)
} else if (strcmp((*subchannel)->inputProto, "file") == 0) {
- (*subchannel)->operations = dabInputRawFileOperations;
+ operations = dabInputRawFileOperations;
#endif
} else if (strcmp((*subchannel)->inputProto, "fifo") == 0) {
- (*subchannel)->operations = dabInputRawFifoOperations;
+ operations = dabInputRawFifoOperations;
} else {
etiLog.log(error,
"Invalid protocol for data input (%s)\n",
@@ -312,7 +314,7 @@ bool parse_cmdline(char **argv,
(*subchannel)->inputProto = "test";
(*subchannel)->type = 1;
(*subchannel)->bitrate = DEFAULT_DATA_BITRATE;
- (*subchannel)->operations = dabInputTestOperations;
+ operations = dabInputTestOperations;
#endif // defined(HAVE_INPUT_TEST)) && defined(HAVE_FORMAT_RAW)
#ifdef HAVE_FORMAT_PACKET
} else if (c == 'P') {
@@ -320,9 +322,9 @@ bool parse_cmdline(char **argv,
(*subchannel)->type = 3;
(*subchannel)->bitrate = DEFAULT_PACKET_BITRATE;
#ifdef HAVE_INPUT_FILE
- (*subchannel)->operations = dabInputPacketFileOperations;
+ operations = dabInputPacketFileOperations;
#elif defined(HAVE_INPUT_FIFO)
- (*subchannel)->operations = dabInputFifoOperations;
+ operations = dabInputFifoOperations;
#else
# pragma error("Must defined at least one packet input")
#endif // defined(HAVE_INPUT_FILE)
@@ -331,7 +333,7 @@ bool parse_cmdline(char **argv,
(*subchannel)->inputProto = "file";
(*subchannel)->type = 3;
(*subchannel)->bitrate = DEFAULT_PACKET_BITRATE;
- (*subchannel)->operations = dabInputEnhancedPacketFileOperations;
+ operations = dabInputEnhancedPacketFileOperations;
#endif // defined(HAVE_FORMAT_EPM)
#endif // defined(HAVE_FORMAT_PACKET)
#ifdef HAVE_FORMAT_DMB
@@ -347,9 +349,9 @@ bool parse_cmdline(char **argv,
*proto = 0;
}
if (strcmp((*subchannel)->inputProto, "udp") == 0) {
- (*subchannel)->operations = dabInputDmbUdpOperations;
+ operations = dabInputDmbUdpOperations;
} else if (strcmp((*subchannel)->inputProto, "file") == 0) {
- (*subchannel)->operations = dabInputDmbFileOperations;
+ operations = dabInputDmbFileOperations;
} else {
etiLog.log(error,
"Invalid protocol for DMB input (%s)\n",
@@ -366,7 +368,8 @@ bool parse_cmdline(char **argv,
"Service '%c' not yet coded!\n", c);
goto EXIT;
}
- (*subchannel)->operations.init(&(*subchannel)->data);
+ (*subchannel)->input = new DabInputCompatible(operations);
+
for (int i = 0; i < 64; ++i) { // Find first free subchannel
subchannel = getSubchannel(ensemble->subchannels, i);
if (subchannel == ensemble->subchannels.end()) {
@@ -588,36 +591,36 @@ bool parse_cmdline(char **argv,
switch ((*subchannel)->type) {
#ifdef HAVE_FORMAT_PACKET
case 3:
- (*subchannel)->operations.clean(&(*subchannel)->data);
- if ((*subchannel)->operations == dabInputPacketFileOperations) {
- (*subchannel)->operations = dabInputFifoOperations;
+ if ( ((DabInputCompatible*)(*subchannel)->input)->getOpts() == dabInputPacketFileOperations) {
+ operations = dabInputFifoOperations;
#ifdef HAVE_FORMAT_EPM
- } else if ((*subchannel)->operations == dabInputEnhancedPacketFileOperations) {
- (*subchannel)->operations = dabInputEnhancedFifoOperations;
+ } else if ( ((DabInputCompatible*)(*subchannel)->input)->getOpts() == dabInputEnhancedPacketFileOperations) {
+ operations = dabInputEnhancedFifoOperations;
#endif // defined(HAVE_FORMAT_EPM)
} else {
etiLog.log(error,
"Error, wrong packet subchannel operations!\n");
goto EXIT;
}
- (*subchannel)->operations.init(&(*subchannel)->data);
+ delete (*subchannel)->input;
+ (*subchannel)->input = new DabInputCompatible(operations);
(*subchannel)->inputProto = "fifo";
break;
#endif // defined(HAVE_FORMAT_PACKET)
#ifdef HAVE_FORMAT_MPEG
case 0:
- (*subchannel)->operations.clean(&(*subchannel)->data);
- if ((*subchannel)->operations == dabInputMpegFileOperations) {
- (*subchannel)->operations = dabInputMpegFifoOperations;
- } else if ((*subchannel)->operations ==
+ if ( ((DabInputCompatible*)(*subchannel)->input)->getOpts() == dabInputMpegFileOperations) {
+ operations = dabInputMpegFifoOperations;
+ } else if (((DabInputCompatible*)(*subchannel)->input)->getOpts() ==
dabInputDabplusFileOperations) {
- (*subchannel)->operations = dabInputDabplusFifoOperations;
+ operations = dabInputDabplusFifoOperations;
} else {
etiLog.log(error,
"Error, wrong audio subchannel operations!\n");
goto EXIT;
}
- (*subchannel)->operations.init(&(*subchannel)->data);
+ delete (*subchannel)->input;
+ (*subchannel)->input = new DabInputCompatible(operations);
(*subchannel)->inputProto = "fifo";
break;
#endif // defined(HAVE_FORMAT_MPEG)
diff --git a/src/ParserConfigfile.cpp b/src/ParserConfigfile.cpp
index ca9983b..3ac6537 100644
--- a/src/ParserConfigfile.cpp
+++ b/src/ParserConfigfile.cpp
@@ -63,6 +63,7 @@
#include "dabInputDmbUdp.h"
#include "dabInputZmq.h"
#include "DabMux.h"
+#include "StatsServer.h"
#ifdef _WIN32
@@ -144,7 +145,9 @@ void parse_configfile(string configuration_file,
bool* enableTist,
unsigned* FICL,
bool* factumAnalyzer,
- unsigned long* limit
+ unsigned long* limit,
+ BaseRemoteController* rc,
+ int* statsServerPort
)
{
using boost::property_tree::ptree;
@@ -183,6 +186,19 @@ void parse_configfile(string configuration_file,
*enableTist = pt_general.get("tist", false);
+ *statsServerPort = pt_general.get<int>("statsserverport", 0);
+
+ /************** READ REMOTE CONTROL PARAMETERS *************/
+ ptree pt_rc = pt.get_child("remotecontrol");
+ int telnetport = pt_rc.get<int>("telnetport", 0);
+
+ if (telnetport != 0) {
+ rc = new RemoteControllerTelnet(telnetport);
+ }
+ else {
+ rc = new RemoteControllerDummy();
+ }
+
/******************** READ ENSEMBLE PARAMETERS *************/
ptree pt_ensemble = pt.get_child("ensemble");
@@ -213,7 +229,7 @@ void parse_configfile(string configuration_file,
ptree pt_services = pt.get_child("services");
for (ptree::iterator it = pt_services.begin();
- it != pt_services.end(); it++) {
+ it != pt_services.end(); ++it) {
string serviceuid = it->first;
ptree pt_service = it->second;
dabService* service = new dabService();
@@ -264,14 +280,14 @@ void parse_configfile(string configuration_file,
map<string, dabSubchannel*> allsubchans;
ptree pt_subchans = pt.get_child("subchannels");
- for (ptree::iterator it = pt_subchans.begin(); it != pt_subchans.end(); it++) {
+ for (ptree::iterator it = pt_subchans.begin(); it != pt_subchans.end(); ++it) {
string subchanuid = it->first;
dabSubchannel* subchan = new dabSubchannel();
ensemble->subchannels.push_back(subchan);
try {
- setup_subchannel_from_ptree(subchan, it->second, ensemble, subchanuid);
+ setup_subchannel_from_ptree(subchan, it->second, ensemble, subchanuid, rc);
}
catch (runtime_error &e) {
etiLog.log(error,
@@ -294,7 +310,7 @@ void parse_configfile(string configuration_file,
/******************** READ COMPONENT PARAMETERS ************/
map<string, dabComponent*> allcomponents;
ptree pt_components = pt.get_child("components");
- for (ptree::iterator it = pt_components.begin(); it != pt_components.end(); it++) {
+ for (ptree::iterator it = pt_components.begin(); it != pt_components.end(); ++it) {
string componentuid = it->first;
ptree pt_comp = it->second;
@@ -371,7 +387,7 @@ void parse_configfile(string configuration_file,
/******************** READ OUTPUT PARAMETERS ***************/
map<string, dabOutput*> alloutputs;
ptree pt_outputs = pt.get_child("outputs");
- for (ptree::iterator it = pt_outputs.begin(); it != pt_outputs.end(); it++) {
+ for (ptree::iterator it = pt_outputs.begin(); it != pt_outputs.end(); ++it) {
string outputuid = it->first;
string uri = pt_outputs.get<string>(outputuid);
@@ -409,7 +425,8 @@ void parse_configfile(string configuration_file,
void setup_subchannel_from_ptree(dabSubchannel* subchan,
boost::property_tree::ptree &pt,
dabEnsemble* ensemble,
- string subchanuid)
+ string subchanuid,
+ BaseRemoteController* rc)
{
using boost::property_tree::ptree;
using boost::property_tree::ptree_error;
@@ -444,7 +461,12 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
subchan->inputName = inputName;
-
+ /* The input is of the old_style type,
+ * with the struct of function pointers,
+ * and needs to be a DabInputCompatible
+ */
+ bool input_is_old_style = true;
+ dabInputOperations operations;
dabProtection* protection = &subchan->protection;
if (0) {
@@ -453,7 +475,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
subchan->inputProto = "file";
subchan->type = 0;
subchan->bitrate = 0;
- subchan->operations = dabInputMpegFileOperations;
+ operations = dabInputMpegFileOperations;
#endif // defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_MPEG)
#if defined(HAVE_FORMAT_DABPLUS)
} else if (type == "dabplus") {
@@ -462,6 +484,10 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
char* proto;
+ char* full_inputName = new char[256];
+ full_inputName[255] = '\0';
+ memcpy(full_inputName, inputName, 255);
+
proto = strstr(inputName, "://");
if (proto == NULL) {
subchan->inputProto = "file";
@@ -474,22 +500,32 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
if (0) {
#if defined(HAVE_INPUT_FILE)
} else if (strcmp(subchan->inputProto, "file") == 0) {
- subchan->operations = dabInputDabplusFileOperations;
+ operations = dabInputDabplusFileOperations;
#endif // defined(HAVE_INPUT_FILE)
#if defined(HAVE_INPUT_ZEROMQ)
}
else if (strcmp(subchan->inputProto, "tcp") == 0) {
- subchan->operations = dabInputZmqOperations;
+ input_is_old_style = false;
+ DabInputZmq* inzmq = new DabInputZmq(subchanuid);
+ inzmq->enrol_at(*rc);
+ subchan->input = inzmq;
+ subchan->inputName = full_inputName;
}
else if (strcmp(subchan->inputProto, "epmg") == 0) {
- etiLog.log(warn,
- "Using untested epmg:// zeromq input\n");
- subchan->operations = dabInputZmqOperations;
+ etiLog.level(warn) << "Using untested epmg:// zeromq input";
+ input_is_old_style = false;
+ DabInputZmq* inzmq = new DabInputZmq(subchanuid);
+ inzmq->enrol_at(*rc);
+ subchan->input = inzmq;
+ subchan->inputName = full_inputName;
}
else if (strcmp(subchan->inputProto, "ipc") == 0) {
- etiLog.log(warn,
- "Using untested ipc:// zeromq input\n");
- subchan->operations = dabInputZmqOperations;
+ etiLog.level(warn) << "Using untested ipc:// zeromq input";
+ input_is_old_style = false;
+ DabInputZmq* inzmq = new DabInputZmq(subchanuid);
+ inzmq->enrol_at(*rc);
+ subchan->input = inzmq;
+ subchan->inputName = full_inputName;
#endif // defined(HAVE_INPUT_ZEROMQ)
} else {
stringstream ss;
@@ -514,11 +550,11 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
#if defined(HAVE_FORMAT_BRIDGE)
#if defined(HAVE_INPUT_UDP)
} else if (strcmp(subchan->inputProto, "udp") == 0) {
- subchan->operations = dabInputBridgeUdpOperations;
+ operations = dabInputBridgeUdpOperations;
#endif // defined(HAVE_INPUT_UDP)
#if defined(HAVE_INPUT_SLIP)
} else if (strcmp(subchan->inputProto, "slip") == 0) {
- subchan->operations = dabInputSlipOperations;
+ operations = dabInputSlipOperations;
#endif // defined(HAVE_INPUT_SLIP)
#endif // defined(HAVE_FORMAT_BRIDGE)
}
@@ -536,18 +572,18 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
if (0) {
#if defined(HAVE_INPUT_UDP)
} else if (strcmp(subchan->inputProto, "udp") == 0) {
- subchan->operations = dabInputUdpOperations;
+ operations = dabInputUdpOperations;
#endif
#if defined(HAVE_INPUT_PRBS) && defined(HAVE_FORMAT_RAW)
} else if (strcmp(subchan->inputProto, "prbs") == 0) {
- subchan->operations = dabInputPrbsOperations;
+ operations = dabInputPrbsOperations;
#endif
#if defined(HAVE_INPUT_FILE) && defined(HAVE_FORMAT_RAW)
} else if (strcmp(subchan->inputProto, "file") == 0) {
- subchan->operations = dabInputRawFileOperations;
+ operations = dabInputRawFileOperations;
#endif
} else if (strcmp(subchan->inputProto, "fifo") == 0) {
- subchan->operations = dabInputRawFifoOperations;
+ operations = dabInputRawFifoOperations;
} else {
stringstream ss;
ss << "Subchannel with uid " << subchanuid <<
@@ -563,7 +599,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
subchan->inputProto = "test";
subchan->type = 1;
subchan->bitrate = DEFAULT_DATA_BITRATE;
- subchan->operations = dabInputTestOperations;
+ operations = dabInputTestOperations;
#endif // defined(HAVE_INPUT_TEST)) && defined(HAVE_FORMAT_RAW)
#ifdef HAVE_FORMAT_PACKET
} else if (type == "packet") {
@@ -571,9 +607,9 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
subchan->type = 3;
subchan->bitrate = DEFAULT_PACKET_BITRATE;
#ifdef HAVE_INPUT_FILE
- subchan->operations = dabInputPacketFileOperations;
+ operations = dabInputPacketFileOperations;
#elif defined(HAVE_INPUT_FIFO)
- subchan->operations = dabInputFifoOperations;
+ operations = dabInputFifoOperations;
#else
# pragma error("Must defined at least one packet input")
#endif // defined(HAVE_INPUT_FILE)
@@ -582,7 +618,7 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
subchan->inputProto = "file";
subchan->type = 3;
subchan->bitrate = DEFAULT_PACKET_BITRATE;
- subchan->operations = dabInputEnhancedPacketFileOperations;
+ operations = dabInputEnhancedPacketFileOperations;
#endif // defined(HAVE_FORMAT_EPM)
#endif // defined(HAVE_FORMAT_PACKET)
#ifdef HAVE_FORMAT_DMB
@@ -598,9 +634,9 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
*proto = 0;
}
if (strcmp(subchan->inputProto, "udp") == 0) {
- subchan->operations = dabInputDmbUdpOperations;
+ operations = dabInputDmbUdpOperations;
} else if (strcmp(subchan->inputProto, "file") == 0) {
- subchan->operations = dabInputDmbFileOperations;
+ operations = dabInputDmbFileOperations;
} else {
stringstream ss;
ss << "Subchannel with uid " << subchanuid <<
@@ -617,7 +653,6 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
ss << "Subchannel with uid " << subchanuid << " has unknown type!";
throw runtime_error(ss.str());
}
- subchan->operations.init(&subchan->data);
subchan->startAddress = 0;
if (type == "audio") {
@@ -649,37 +684,33 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
switch (subchan->type) {
#ifdef HAVE_FORMAT_PACKET
case 3:
- subchan->operations.clean(&subchan->data);
- if (subchan->operations == dabInputPacketFileOperations) {
- subchan->operations = dabInputFifoOperations;
+ if (operations == dabInputPacketFileOperations) {
+ operations = dabInputFifoOperations;
#ifdef HAVE_FORMAT_EPM
- } else if (subchan->operations == dabInputEnhancedPacketFileOperations) {
- subchan->operations = dabInputEnhancedFifoOperations;
+ } else if (operations == dabInputEnhancedPacketFileOperations) {
+ operations = dabInputEnhancedFifoOperations;
#endif // defined(HAVE_FORMAT_EPM)
} else {
etiLog.log(error,
"Error, wrong packet subchannel operations!\n");
throw runtime_error("Error, wrong packet subchannel operations!\n");
}
- subchan->operations.init(&subchan->data);
subchan->inputProto = "fifo";
break;
#endif // defined(HAVE_FORMAT_PACKET)
#ifdef HAVE_FORMAT_MPEG
case 0:
- subchan->operations.clean(&subchan->data);
- if (subchan->operations == dabInputMpegFileOperations) {
- subchan->operations = dabInputMpegFifoOperations;
- } else if (subchan->operations ==
+ if (operations == dabInputMpegFileOperations) {
+ operations = dabInputMpegFifoOperations;
+ } else if (operations ==
dabInputDabplusFileOperations) {
- subchan->operations = dabInputDabplusFifoOperations;
+ operations = dabInputDabplusFifoOperations;
} else {
etiLog.log(error,
"Error, wrong audio subchannel operations!\n");
throw runtime_error(
"Error, wrong audio subchannel operations!\n");
}
- subchan->operations.init(&subchan->data);
subchan->inputProto = "fifo";
break;
#endif // defined(HAVE_FORMAT_MPEG)
@@ -735,4 +766,9 @@ void setup_subchannel_from_ptree(dabSubchannel* subchan,
}
catch (ptree_error &e) {}
+ /* Create object */
+ if (input_is_old_style) {
+ subchan->input = new DabInputCompatible(operations);
+ }
+ // else { it's already been created! }
}
diff --git a/src/ParserConfigfile.h b/src/ParserConfigfile.h
index a09d1fc..f91a545 100644
--- a/src/ParserConfigfile.h
+++ b/src/ParserConfigfile.h
@@ -41,13 +41,15 @@ void parse_configfile(std::string configuration_file,
bool* enableTist,
unsigned* FICL,
bool* factumAnalyzer,
- unsigned long* limit
- );
+ unsigned long* limit,
+ BaseRemoteController* rc,
+ int* statsServerPort);
void setup_subchannel_from_ptree(dabSubchannel* subchan,
boost::property_tree::ptree &pt,
dabEnsemble* ensemble,
- std::string uid);
+ string subchanuid,
+ BaseRemoteController* rc);
#endif
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
new file mode 100644
index 0000000..7cc975a
--- /dev/null
+++ b/src/RemoteControl.cpp
@@ -0,0 +1,226 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
+ Her Majesty the Queen in Right of Canada (Communications Research
+ Center Canada)
+
+ Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012
+ */
+/*
+ This file is part of CRC-DabMux.
+
+ CRC-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ CRC-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <list>
+#include <string>
+#include <string>
+#include <boost/asio.hpp>
+#include "Log.h"
+
+#include "RemoteControl.h"
+
+using boost::asio::ip::tcp;
+
+void
+RemoteControllerTelnet::process(long)
+{
+ m_welcome = "CRC-DABMUX Remote Control CLI\nWrite 'help' for help.\n**********\n";
+ m_prompt = "> ";
+
+ std::string in_message;
+ size_t length;
+
+ try {
+ boost::asio::io_service io_service;
+ tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), m_port));
+
+ while (m_running) {
+ in_message = "";
+
+ tcp::socket socket(io_service);
+
+ acceptor.accept(socket);
+
+ boost::system::error_code ignored_error;
+
+ boost::asio::write(socket, boost::asio::buffer(m_welcome),
+ boost::asio::transfer_all(),
+ ignored_error);
+
+ while (m_running && in_message != "quit") {
+ boost::asio::write(socket, boost::asio::buffer(m_prompt),
+ boost::asio::transfer_all(),
+ ignored_error);
+
+ in_message = "";
+
+ boost::asio::streambuf buffer;
+ length = boost::asio::read_until( socket, buffer, "\n", ignored_error);
+
+ std::istream str(&buffer);
+ std::getline(str, in_message);
+
+ if (length == 0) {
+ etiLog.level(info) << "RC: Connection terminated";
+ break;
+ }
+
+ while (in_message.length() > 0 &&
+ (in_message[in_message.length()-1] == '\r' ||
+ in_message[in_message.length()-1] == '\n')) {
+ in_message.erase(in_message.length()-1, 1);
+ }
+
+ if (in_message.length() == 0) {
+ continue;
+ }
+
+ etiLog.level(info) << "RC: Got message '" << in_message << "'";
+
+ dispatch_command(socket, in_message);
+ }
+ etiLog.level(info) << "RC: Closing socket";
+ socket.close();
+ }
+ }
+ catch (std::exception& e)
+ {
+ etiLog.level(error) << "Remote control caught exception: " << e.what();
+ }
+}
+
+void
+RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command)
+{
+ vector<string> cmd = tokenise_(command);
+
+ if (cmd[0] == "help") {
+ reply(socket,
+ "The following commands are supported:\n"
+ " list\n"
+ " * Lists the modules that are loaded\n"
+ " list MODULE\n"
+ " * Lists the parameters exported by module MODULE\n"
+ " show MODULE\n"
+ " * Lists all parameters and their values from module MODULE\n"
+ " get MODULE PARAMETER\n"
+ " * Gets the value for the specified PARAMETER from module MODULE\n"
+ " set MODULE PARAMETER VALUE\n"
+ " * Sets the value for the PARAMETER ofr module MODULE\n"
+ " quit\n"
+ " * Terminate this session\n"
+ "\n");
+ }
+ else if (cmd[0] == "list") {
+ stringstream ss;
+
+ if (cmd.size() == 1) {
+ for (list<RemoteControllable*>::iterator it = m_cohort.begin();
+ it != m_cohort.end(); ++it) {
+ ss << (*it)->get_rc_name() << " ";
+ }
+ }
+ else if (cmd.size() == 2) {
+ try {
+ stringstream ss;
+
+ list< vector<string> > params = get_parameter_descriptions_(cmd[1]);
+ for (list< vector<string> >::iterator it = params.begin();
+ it != params.end(); ++it) {
+ ss << (*it)[0] << " : " << (*it)[1] << endl;
+ }
+ reply(socket, ss.str());
+ }
+ catch (ParameterError &e) {
+ reply(socket, e.what());
+ }
+ }
+ else {
+ reply(socket, "Too many arguments for command 'list'");
+ }
+
+ reply(socket, ss.str());
+ }
+ else if (cmd[0] == "show") {
+ if (cmd.size() == 2) {
+ try {
+ stringstream ss;
+ list< vector<string> > r = get_param_list_values_(cmd[1]);
+ for (list< vector<string> >::iterator it = r.begin();
+ it != r.end(); ++it) {
+ ss << (*it)[0] << ": " << (*it)[1] << endl;
+ }
+ reply(socket, ss.str());
+
+ }
+ catch (ParameterError &e) {
+ reply(socket, e.what());
+ }
+ }
+ else
+ {
+ reply(socket, "Incorrect parameters for command 'show'");
+ }
+ }
+ else if (cmd[0] == "get") {
+ if (cmd.size() == 3) {
+ try {
+ string r = get_param_(cmd[1], cmd[2]);
+ reply(socket, r);
+ }
+ catch (ParameterError &e) {
+ reply(socket, e.what());
+ }
+ }
+ else
+ {
+ reply(socket, "Incorrect parameters for command 'get'");
+ }
+ }
+ else if (cmd[0] == "set") {
+ if (cmd.size() == 4) {
+ try {
+ set_param_(cmd[1], cmd[2], cmd[3]);
+ reply(socket, "ok");
+ }
+ catch (ParameterError &e) {
+ reply(socket, e.what());
+ }
+ catch (exception &e) {
+ reply(socket, "Error: Invalid parameter value. ");
+ }
+ }
+ else
+ {
+ reply(socket, "Incorrect parameters for command 'get'");
+ }
+ }
+ else if (cmd[0] == "quit") {
+ reply(socket, "Goodbye");
+ }
+ else {
+ reply(socket, "Message not understood");
+ }
+}
+
+void
+RemoteControllerTelnet::reply(tcp::socket& socket, string message)
+{
+ boost::system::error_code ignored_error;
+ stringstream ss;
+ ss << message << "\r\n";
+ boost::asio::write(socket, boost::asio::buffer(ss.str()),
+ boost::asio::transfer_all(),
+ ignored_error);
+}
+
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
new file mode 100644
index 0000000..d0965e0
--- /dev/null
+++ b/src/RemoteControl.h
@@ -0,0 +1,233 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
+ Her Majesty the Queen in Right of Canada (Communications Research
+ Center Canada)
+
+ Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012
+
+ This module adds remote-control capability to some of the dabmod modules.
+ see testremotecontrol/test.cpp for an example of how to use this.
+ */
+/*
+ This file is part of CRC-DabMux.
+
+ CRC-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ CRC-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef _REMOTECONTROL_H
+#define _REMOTECONTROL_H
+
+#include <list>
+#include <map>
+#include <string>
+#include <string>
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/asio.hpp>
+#include <boost/foreach.hpp>
+#include <boost/tokenizer.hpp>
+#include <boost/thread.hpp>
+#include <stdexcept>
+
+
+#define RC_ADD_PARAMETER(p, desc) { \
+ vector<string> p; \
+ p.push_back(#p); \
+ p.push_back(desc); \
+ m_parameters.push_back(p); \
+}
+
+
+using namespace std;
+using boost::asio::ip::tcp;
+
+class ParameterError : public std::exception
+{
+ public:
+ ParameterError(string message) : m_message(message) {}
+ ~ParameterError() throw() {};
+ const char* what() const throw() { return m_message.c_str(); }
+
+ private:
+ string m_message;
+};
+
+class RemoteControllable;
+
+/* Remote controllers (that recieve orders from the user) must implement BaseRemoteController */
+class BaseRemoteController {
+ public:
+ /* Add a new controllable under this controller's command */
+ virtual void enrol(RemoteControllable* controllable) = 0;
+};
+
+/* Objects that support remote control must implement the following class */
+class RemoteControllable {
+ public:
+
+ RemoteControllable(string name) : m_name(name) {}
+
+ /* return a short name used to identify the controllable.
+ * It might be used in the commands the user has to type, so keep
+ * it short
+ */
+ virtual std::string get_rc_name() { return m_name; }
+
+ /* Tell the controllable to enrol at the given controller */
+ virtual void enrol_at(BaseRemoteController& controller) {
+ controller.enrol(this);
+ }
+
+ /* Return a list of possible parameters that can be set */
+ virtual list<string> get_supported_parameters() {
+ list<string> parameterlist;
+ for (list< vector<string> >::iterator it = m_parameters.begin();
+ it != m_parameters.end(); ++it) {
+ parameterlist.push_back((*it)[0]);
+ }
+ return parameterlist;
+ }
+
+ /* Return a mapping of the descriptions of all parameters */
+ virtual std::list< std::vector<std::string> > get_parameter_descriptions() {
+ return m_parameters;
+ }
+
+ /* Base function to set parameters. */
+ virtual void set_parameter(string parameter, string value) = 0;
+
+ /* Getting a parameter always returns a string. */
+ virtual string get_parameter(string parameter) = 0;
+
+ protected:
+ std::string m_name;
+ std::list< std::vector<std::string> > m_parameters;
+};
+
+/* Implements a Remote controller based on a simple telnet CLI
+ * that listens on localhost
+ */
+class RemoteControllerTelnet : public BaseRemoteController {
+ public:
+ RemoteControllerTelnet()
+ : m_running(false), m_port(0) {}
+
+ RemoteControllerTelnet(int port)
+ : m_running(true), m_port(port),
+ m_child_thread(&RemoteControllerTelnet::process, this, 0)
+ {}
+
+ ~RemoteControllerTelnet() {
+ m_running = false;
+ if (m_port) {
+ m_child_thread.interrupt();
+ m_child_thread.join();
+ }
+ }
+
+ void process(long);
+
+ void dispatch_command(tcp::socket& socket, string command);
+
+ void reply(tcp::socket& socket, string message);
+
+ void enrol(RemoteControllable* controllable) {
+ m_cohort.push_back(controllable);
+ }
+
+
+ private:
+ RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other);
+ RemoteControllerTelnet(const RemoteControllerTelnet& other);
+
+ vector<string> tokenise_(string message) {
+ vector<string> all_tokens;
+
+ boost::char_separator<char> sep(" ");
+ boost::tokenizer< boost::char_separator<char> > tokens(message, sep);
+ BOOST_FOREACH (const string& t, tokens) {
+ all_tokens.push_back(t);
+ }
+ return all_tokens;
+ }
+
+ RemoteControllable* get_controllable_(string name) {
+ for (list<RemoteControllable*>::iterator it = m_cohort.begin();
+ it != m_cohort.end(); ++it) {
+ if ((*it)->get_rc_name() == name)
+ {
+ return *it;
+ }
+ }
+ throw ParameterError("Module name unknown");
+ }
+
+ list< vector<string> > get_parameter_descriptions_(string name) {
+ RemoteControllable* controllable = get_controllable_(name);
+ return controllable->get_parameter_descriptions();
+ }
+
+ list<string> get_param_list_(string name) {
+ RemoteControllable* controllable = get_controllable_(name);
+ return controllable->get_supported_parameters();
+ }
+
+ list< vector<string> > get_param_list_values_(string name) {
+ RemoteControllable* controllable = get_controllable_(name);
+
+ list< vector<string> > allparams;
+ list<string> params = controllable->get_supported_parameters();
+ for (list<string>::iterator it = params.begin(); it != params.end(); ++it) {
+ vector<string> item;
+ item.push_back(*it);
+ item.push_back(controllable->get_parameter(*it));
+
+ allparams.push_back(item);
+ }
+ return allparams;
+ }
+
+ string get_param_(string name, string param) {
+ RemoteControllable* controllable = get_controllable_(name);
+ return controllable->get_parameter(param);
+ }
+
+ void set_param_(string name, string param, string value) {
+ RemoteControllable* controllable = get_controllable_(name);
+ return controllable->set_parameter(param, value);
+ }
+
+ bool m_running;
+ boost::thread m_child_thread;
+
+ /* This controller commands the controllables in the cohort */
+ list<RemoteControllable*> m_cohort;
+
+ std::string m_welcome;
+ std::string m_prompt;
+
+ int m_port;
+};
+
+
+/* The Dummy remote controller does nothing
+ */
+class RemoteControllerDummy : public BaseRemoteController {
+ public:
+ void enrol(RemoteControllable* controllable) {};
+};
+
+#endif
+
diff --git a/src/StatsServer.h b/src/StatsServer.h
index 5bbf327..8462e6a 100644
--- a/src/StatsServer.h
+++ b/src/StatsServer.h
@@ -80,6 +80,7 @@ struct InputStat
class StatsServer
{
public:
+ StatsServer() : m_listenport(0), m_running(false) {}
StatsServer(int listen_port) :
m_listenport(listen_port),
m_running(true),
@@ -139,5 +140,8 @@ class StatsServer
mutable boost::mutex m_mutex;
};
+
+extern StatsServer* global_stats;
+
#endif
diff --git a/src/UdpSocket.h b/src/UdpSocket.h
index a500606..d96e01e 100644
--- a/src/UdpSocket.h
+++ b/src/UdpSocket.h
@@ -115,7 +115,7 @@ class UdpPacket {
void setSize(unsigned newSize);
InetAddress &getAddress();
// Not implemented
- const UdpPacket &operator=(const UdpPacket&);
+ const UdpPacket& operator=(const UdpPacket&);
private:
char *dataBuf;
diff --git a/src/dabInput.h b/src/dabInput.h
index 685f7b2..12823b8 100644
--- a/src/dabInput.h
+++ b/src/dabInput.h
@@ -26,8 +26,10 @@
# include "config.h"
#endif
#include "Log.h"
-extern Logger etiLog;
+#include "RemoteControl.h"
+#include <string>
+extern Logger etiLog;
struct dabInputOperations {
int (*init)(void** args);
@@ -44,6 +46,68 @@ struct dabInputOperations {
bool operator==(const dabInputOperations&);
};
+/* New input object base */
+class DabInputBase {
+ public:
+ virtual int open(const std::string name) = 0;
+ virtual int readFrame(void* buffer, int size) = 0;
+ virtual int setBitrate(int bitrate) = 0;
+ virtual int close() = 0;
+
+ virtual ~DabInputBase() {};
+ protected:
+ DabInputBase() {};
+};
+
+/* Wrapper class for old-style dabInputOperations inputs */
+class DabInputCompatible : public DabInputBase {
+ public:
+ DabInputCompatible(dabInputOperations ops)
+ : m_ops(ops)
+ { m_ops.init(&args); }
+
+ virtual ~DabInputCompatible()
+ { m_ops.clean(&args); }
+
+ virtual int open(const std::string name)
+ { return m_ops.open(args, name.c_str()); }
+
+ virtual int setbuf(int size)
+ { return m_ops.setbuf(args, size); }
+
+ virtual int readFrame(void* buffer, int size)
+ {
+ if (m_ops.lock) {
+ m_ops.lock(args);
+ }
+ int result = m_ops.readFrame(&m_ops, args, buffer, size);
+ if (m_ops.unlock) {
+ m_ops.unlock(args);
+ }
+ return result;
+ }
+
+ virtual int setBitrate(int bitrate)
+ { return m_ops.setBitrate(&m_ops, args, bitrate); }
+
+ virtual int close()
+ { return m_ops.close(args); }
+
+ virtual int rewind()
+ { return m_ops.rewind(args); }
+
+ virtual int read(void* buffer, int size)
+ { return m_ops.read(args, buffer, size); }
+
+ virtual dabInputOperations getOpts() { return m_ops; }
+
+ private:
+ DabInputCompatible& operator=(const DabInputCompatible& other);
+ DabInputCompatible(const DabInputCompatible& other);
+
+ dabInputOperations m_ops;
+ void* args;
+};
int dabInputSetbuf(void* args, int size);
int dabInputSetbitrate(dabInputOperations* ops, void* args, int bitrate);
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index 580e82d..c84e81c 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -34,7 +34,6 @@
#include "dabInput.h"
#include "dabInputZmq.h"
-#include "dabInputFifo.h"
#include "StatsServer.h"
#ifdef HAVE_CONFIG_H
@@ -44,182 +43,153 @@
#ifdef HAVE_INPUT_ZEROMQ
#include <stdio.h>
-#include <zmq.h>
+#include <zmq.hpp>
#include <list>
+#include <exception>
#include <string.h>
#include <string>
+#include <sstream>
#include <limits.h>
#ifdef __MINGW32__
# define bzero(s, n) memset(s, 0, n)
#endif
-extern StatsServer global_stats;
+extern StatsServer* global_stats;
-struct dabInputOperations dabInputZmqOperations = {
- dabInputZmqInit,
- dabInputZmqOpen,
- dabInputSetbuf,
- NULL,
- NULL,
- NULL,
- dabInputZmqReadFrame,
- dabInputSetbitrate,
- dabInputZmqClose,
- dabInputZmqClean,
- NULL
-};
-
-
-int dabInputZmqInit(void** args)
+int DabInputZmq::open(const std::string inputUri)
{
- dabInputZmqData* input = new dabInputZmqData;
- input->zmq_context = zmq_ctx_new();
- if (input->zmq_context == NULL) {
- etiLog.log(error, "Failed to initialise ZeroMQ context: %s\n", zmq_strerror(errno));
- return 1;
+ // Prepare the ZMQ socket to accept connections
+ try {
+ m_zmq_sock.bind(inputUri.c_str());
}
-
- input->zmq_sock = zmq_socket(input->zmq_context, ZMQ_SUB);
- if (input->zmq_sock == NULL) {
- etiLog.log(error, "Failed to initialise ZeroMQ socket: %s\n", zmq_strerror(errno));
- return 1;
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ bind for input " << m_name << " failed";
+ throw std::runtime_error(os.str());
}
- input->prebuffering = INPUT_ZMQ_PREBUFFERING;
-
- *args = input;
-
- return 0;
-}
-
-
-int dabInputZmqOpen(void* args, const char* inputUri)
-{
- dabInputZmqData* input = (dabInputZmqData*)args;
-
- std::string uri = "tcp://" + std::string(inputUri);
- int connect_error = zmq_bind(input->zmq_sock, uri.c_str());
-
- if (connect_error < 0) {
- etiLog.log(error, "Failed to connect socket to uri '%s': %s\n", uri.c_str(), zmq_strerror(errno));
- return 1;
+ try {
+ m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
}
-
- connect_error = zmq_setsockopt(input->zmq_sock, ZMQ_SUBSCRIBE, NULL, 0);
- if (connect_error < 0) {
- etiLog.log(error, "Failed to subscribe to zmq messages: %s\n", zmq_strerror(errno));
- return 1;
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set socket options for input " << m_name << " failed";
+ throw std::runtime_error(os.str());
}
- global_stats.registerInput(uri);
+ // We want to appear in the statistics !
+ global_stats->registerInput(m_name);
- input->uri = uri;
return 0;
}
-
// size corresponds to a frame size. It is constant for a given bitrate
-int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size)
+int DabInputZmq::readFrame(void* buffer, int size)
{
int rc;
- dabInputZmqData* input = (dabInputZmqData*)args;
/* We must *always* read data from the ZMQ socket,
* to make sure that ZMQ internal buffers are emptied
* quickly. It's the only way to control the buffers
* of the whole path from encoder to our frame_buffer.
*/
- rc = dabInputZmqReadFromSocket(input, size);
+ rc = readFromSocket(size);
/* Notify of a buffer overrun, and drop some frames */
- if (input->frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) {
- global_stats.notifyOverrun(input->uri);
+ if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ global_stats->notifyOverrun(m_name);
/* If the buffer is really too full, we drop as many frames as needed
* to get down to the prebuffering size. We would like to have our buffer
* filled to the prebuffering length.
*/
- if (input->frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) {
- size_t over_max = input->frame_buffer.size() - INPUT_ZMQ_PREBUFFERING;
+ if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING;
while (over_max--) {
- input->frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
}
}
else {
/* Our frame_buffer contains DAB logical frames. Five of these make one
* AAC superframe.
*
- * Dropping this superframe amounts to dropping 120ms of audio. */
- input->frame_buffer.pop_front();
- input->frame_buffer.pop_front();
- input->frame_buffer.pop_front();
- input->frame_buffer.pop_front();
- input->frame_buffer.pop_front();
+ * Dropping this superframe amounts to dropping 120ms of audio.
+ *
+ * We're actually not sure to drop five DAB logical frames
+ * beloning to the same AAC superframe. It is assumed that no
+ * receiver will crash because of this. At least, the DAB logical frame
+ * vs. AAC superframe alignment is preserved.
+ *
+ * TODO: of course this assumption probably doesn't hold. Fix this !
+ * */
+ m_frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
}
}
- if (input->prebuffering > 0) {
+ if (m_prebuffering > 0) {
if (rc > 0)
- input->prebuffering--;
- if (input->prebuffering == 0)
+ m_prebuffering--;
+ if (m_prebuffering == 0)
etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
- input->uri.c_str());
+ m_name.c_str());
/* During prebuffering, give a zeroed frame to the mux */
- global_stats.notifyUnderrun(input->uri);
+ global_stats->notifyUnderrun(m_name);
memset(buffer, 0, size);
return size;
}
// Save stats data in bytes, not in frames
- global_stats.notifyBuffer(input->uri, input->frame_buffer.size() * size);
+ global_stats->notifyBuffer(m_name, m_frame_buffer.size() * size);
- if (input->frame_buffer.empty()) {
+ if (m_frame_buffer.empty()) {
etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
- input->uri.c_str());
+ m_name.c_str());
// reset prebuffering
- input->prebuffering = INPUT_ZMQ_PREBUFFERING;
+ m_prebuffering = INPUT_ZMQ_PREBUFFERING;
/* We have no data to give, we give a zeroed frame */
- global_stats.notifyUnderrun(input->uri);
+ global_stats->notifyUnderrun(m_name);
memset(buffer, 0, size);
return size;
}
else
{
/* Normal situation, give a frame from the frame_buffer */
-
- char* newframe = input->frame_buffer.front();
+ char* newframe = m_frame_buffer.front();
memcpy(buffer, newframe, size);
delete[] newframe;
- input->frame_buffer.pop_front();
+ m_frame_buffer.pop_front();
return size;
}
}
// Read a superframe from the socket, cut it into five frames, and push to list
-int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)
+int DabInputZmq::readFromSocket(int framesize)
{
int rc;
- zmq_msg_t msg;
- rc = zmq_msg_init(&msg);
- if (rc == -1) {
- etiLog.log(error, "Failed to init zmq message: %s\n", zmq_strerror(errno));
- return 0;
- }
+ int nBytes;
+ zmq::message_t msg;
- int nBytes = zmq_msg_recv(&msg, input->zmq_sock, ZMQ_DONTWAIT);
- if (nBytes == -1) {
- if (errno != EAGAIN) {
- etiLog.log(error, "Failed to receive zmq message: %s\n", zmq_strerror(errno));
+ try {
+ nBytes = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ if (nBytes == 0) {
+ return 0;
}
- zmq_msg_close(&msg);
- return 0;
+ }
+ catch (zmq::error_t& err)
+ {
+ etiLog.level(error) << "Failed to receive from zmq socket " <<
+ m_name << ": " << err.what();
}
- char* data = (char*)zmq_msg_data(&msg);
+ char* data = (char*)msg.data();
/* TS 102 563, Section 6:
* Audio super frames are transported in five successive DAB logical frames
@@ -227,10 +197,10 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)
*/
if (nBytes == 5*framesize)
{
- if (input->frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
etiLog.level(warn) <<
- "inputZMQ " << input->uri <<
- " buffer full (" << input->frame_buffer.size() << "),"
+ "inputZMQ " << m_name <<
+ " buffer full (" << m_frame_buffer.size() << "),"
" dropping incoming superframe !";
nBytes = 0;
}
@@ -241,40 +211,66 @@ int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize)
framestart += framesize) {
char* frame = new char[framesize];
memcpy(frame, framestart, framesize);
- input->frame_buffer.push_back(frame);
+ m_frame_buffer.push_back(frame);
}
}
}
else
{
etiLog.level(error) <<
- "inputZMQ " << input->uri <<
- " wrong data size: recv'd" << nBytes <<
+ "inputZMQ " << m_name <<
+ " wrong data size: recv'd " << nBytes <<
", need " << 5*framesize << ".";
nBytes = 0;
}
- zmq_msg_close(&msg);
return nBytes;
}
-
-int dabInputZmqClose(void* args)
+int DabInputZmq::close()
{
- dabInputZmqData* input = (dabInputZmqData*)args;
- zmq_close(input->zmq_sock);
+ m_zmq_sock.close();
return 0;
}
+int DabInputZmq::setBitrate(int bitrate)
+{
+ m_bitrate = bitrate;
+ return bitrate; // TODO do a nice check here
+}
-int dabInputZmqClean(void** args)
+/********* REMOTE CONTROL ***********/
+
+void DabInputZmq::set_parameter(string parameter, string value)
{
- dabInputZmqData* input = (dabInputZmqData*)(*args);
- zmq_ctx_term(input->zmq_context);
- delete input;
- return 0;
+ stringstream ss(value);
+ ss.exceptions ( stringstream::failbit | stringstream::badbit );
+
+ if (parameter == "buffer") {
+ throw ParameterError("Parameter 'ntaps' is read-only");
+ }
+ else {
+ stringstream ss;
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
}
+string DabInputZmq::get_parameter(string parameter)
+{
+ stringstream ss;
+ if (parameter == "buffer") {
+ ss << INPUT_ZMQ_MAX_BUFFER_SIZE;
+ }
+ else {
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+ return ss.str();
+
+}
#endif
diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h
index fdca9e6..bb37a94 100644
--- a/src/dabInputZmq.h
+++ b/src/dabInputZmq.h
@@ -43,11 +43,10 @@
#ifdef HAVE_INPUT_ZEROMQ
-#include <zmq.h>
+#include <zmq.hpp>
#include <list>
#include <string>
#include "dabInput.h"
-#include "dabInputFifo.h"
#include "StatsServer.h"
/* The frame_buffer contains DAB logical frames as defined in
@@ -62,27 +61,40 @@
#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms
-extern struct dabInputOperations dabInputZmqOperations;
-
-struct dabInputZmqData {
- void* zmq_context;
- void* zmq_sock;
- std::list<char*> frame_buffer; //stores elements of type char[<framesize>]
- int prebuffering;
- std::string uri;
+class DabInputZmq : public DabInputBase, public RemoteControllable {
+ public:
+ DabInputZmq(const std::string name)
+ : RemoteControllable(name),
+ m_name(name), m_zmq_context(1),
+ m_zmq_sock(m_zmq_context, ZMQ_SUB),
+ m_prebuffering(INPUT_ZMQ_PREBUFFERING),
+ m_bitrate(0) {
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [aac superframes]");
+ }
+
+ virtual int open(const std::string inputUri);
+ virtual int readFrame(void* buffer, int size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ /* Remote control */
+ virtual void set_parameter(string parameter, string value);
+
+ /* Getting a parameter always returns a string. */
+ virtual string get_parameter(string parameter);
+
+ private:
+ int readFromSocket(int framesize);
+
+ std::string m_name;
+ zmq::context_t m_zmq_context;
+ zmq::socket_t m_zmq_sock; // handle for the zmq socket
+ int m_prebuffering;
+ std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>]
+ int m_bitrate;
};
-
-int dabInputZmqInit(void** args);
-int dabInputZmqOpen(void* args, const char* inputUri);
-int dabInputZmqReadFrame(dabInputOperations* ops, void* args, void* buffer, int size);
-int dabInputZmqClose(void* args);
-int dabInputZmqClean(void** args);
-
-// Get new message from ZeroMQ
-int dabInputZmqReadFromSocket(dabInputZmqData* input, int framesize);
-
-
#endif // HAVE_INPUT_ZMQ
#endif // DAB_INPUT_ZMQ_H
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h
index 5cb619f..bbf27e3 100644
--- a/src/dabOutput/dabOutput.h
+++ b/src/dabOutput/dabOutput.h
@@ -134,13 +134,16 @@ class DabOutputRaw : public DabOutput
#ifndef _WIN32
isCyclades_ = other.isCyclades_;
#endif
- buffer_ = other.buffer_;
+ buffer_ = new unsigned char[6144];
+ memcpy(buffer_, other.buffer_, 6144);
}
~DabOutputRaw() {
delete[] buffer_;
}
+ const DabOutputRaw operator=(const DabOutputRaw& other);
+
int Open(const char* name);
int Write(void* buffer, int size);
int Close();
@@ -164,11 +167,11 @@ class DabOutputUdp : public DabOutput
socket_ = new UdpSocket();
}
- DabOutputUdp(const DabOutputUdp& other)
- {
- packet_ = other.packet_;
- socket_ = other.socket_;
- }
+ // make sure we don't copy this output around
+ // the UdpPacket and UdpSocket do not support
+ // copying either
+ DabOutputUdp(const DabOutputUdp& other);
+ DabOutputUdp operator=(const DabOutputUdp& other);
~DabOutputUdp() {
delete socket_;
@@ -195,12 +198,8 @@ class DabOutputTcp : public DabOutput
client = NULL;
}
- DabOutputTcp(const DabOutputTcp& other)
- {
- server = other.server;
- client = other.client;
- thread_ = other.thread_;
- }
+ DabOutputTcp(const DabOutputTcp& other);
+ DabOutputTcp operator=(const DabOutputTcp& other);
~DabOutputTcp() {
diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp
index 459120f..607fe71 100644
--- a/src/dabOutput/dabOutputUdp.cpp
+++ b/src/dabOutput/dabOutputUdp.cpp
@@ -38,8 +38,7 @@
# include <sys/types.h>
# include <sys/socket.h>
# include <sys/ioctl.h>
-# include <linux/if_packet.h>
-# include <linux/netdevice.h>
+# include <net/if_packet.h>
# include <net/if_arp.h>
#endif