summaryrefslogtreecommitdiffstats
path: root/src/DabMod.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli (think) <matthias@mpb.li>2013-11-10 21:50:12 +0100
committerMatthias P. Braendli (think) <matthias@mpb.li>2013-11-10 21:50:12 +0100
commit5d965e80be2e6ab62bc82fb2e0d4d472153ad241 (patch)
tree5add36f337b0de524b3d098f0b1fcc8d68aba0d7 /src/DabMod.cpp
parent4f9a01a80570437b86e69eb0542b13df9a20743d (diff)
downloaddabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.gz
dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.bz2
dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.zip
crc-dabmod: add ZeroMQ input module
Diffstat (limited to 'src/DabMod.cpp')
-rw-r--r--src/DabMod.cpp79
1 files changed, 52 insertions, 27 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 64e557c..1e8937d 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -153,8 +153,9 @@ int main(int argc, char* argv[])
int ret = 0;
bool loop = false;
std::string inputName = "";
+ std::string inputTransport = "file";
- const char* outputName;
+ std::string outputName;
int useFileOutput = 0;
int useUHDOutput = 0;
@@ -190,6 +191,10 @@ int main(int argc, char* argv[])
Logger logger;
InputFileReader inputFileReader(logger);
+#if defined(HAVE_INPUT_ZEROMQ)
+ InputZeroMQReader inputZeroMQReader(logger);
+#endif
+ InputReader* inputReader;
signal(SIGINT, signalHandler);
@@ -206,7 +211,7 @@ int main(int argc, char* argv[])
if (c != 'C') {
use_configuration_cmdline = true;
}
-
+
switch (c) {
case 'C':
use_configuration_file = true;
@@ -335,7 +340,8 @@ int main(int argc, char* argv[])
loop = true;
}
- inputName = pt.get("input.filename", "/dev/stdin");
+ inputTransport = pt.get("input.transport", "file");
+ inputName = pt.get("input.source", "/dev/stdin");
// log parameters:
if (pt.get("log.syslog", 0) == 1) {
@@ -391,7 +397,7 @@ int main(int argc, char* argv[])
if (output_selected == "file") {
try {
- outputName = pt.get<std::string>("fileoutput.filename").c_str();
+ outputName = pt.get<std::string>("fileoutput.filename");
}
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
@@ -477,15 +483,22 @@ int main(int argc, char* argv[])
modconf.delay_calculation_pipeline_stages += FIRFILTER_PIPELINE_DELAY;
}
-
// Setting ETI input filename
if (inputName == "") {
if (optind < argc) {
inputName = argv[optind++];
- } else {
+
+ if (inputName.substr(0, 4) == "zmq+" &&
+ inputName.find("://") != std::string::npos) {
+ // if the name starts with zmq+XYZ://somewhere:port
+ inputTransport = "zeromq";
+ }
+ }
+ else {
inputName = "/dev/stdin";
}
}
+
// Checking unused arguments
if (optind != argc) {
fprintf(stderr, "Invalid arguments:");
@@ -507,13 +520,14 @@ int main(int argc, char* argv[])
// Print settings
fprintf(stderr, "Input\n");
- fprintf(stderr, " Name: %s\n", inputName.c_str());
+ fprintf(stderr, " Type: %s\n", inputTransport.c_str());
+ fprintf(stderr, " Source: %s\n", inputName.c_str());
fprintf(stderr, "Output\n");
if (useUHDOutput) {
fprintf(stderr, " UHD, Device: %s\n", outputuhd_conf.device);
}
else if (useFileOutput) {
- fprintf(stderr, " Name: %s\n", outputName);
+ fprintf(stderr, " Name: %s\n", outputName.c_str());
}
fprintf(stderr, " Sampling rate: ");
if (outputRate > 1000) {
@@ -526,20 +540,39 @@ int main(int argc, char* argv[])
fprintf(stderr, "%zu Hz\n", outputRate);
}
- // Opening ETI input file
- if (inputFileReader.Open(inputName) == -1) {
- fprintf(stderr, "Unable to open input file!\n");
- logger.level(error) << "Unable to open input file!";
+ if (inputTransport == "file") {
+ // Opening ETI input file
+ if (inputFileReader.Open(inputName, loop) == -1) {
+ fprintf(stderr, "Unable to open input file!\n");
+ logger.level(error) << "Unable to open input file!";
+ ret = -1;
+ goto END_MAIN;
+ }
+
+ inputReader = &inputFileReader;
+ }
+ else if (inputTransport == "zeromq") {
+#if !defined(HAVE_INPUT_ZEROMQ)
+ fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n");
ret = -1;
goto END_MAIN;
+#else
+ inputZeroMQReader.Open(inputName);
+ inputReader = &inputZeroMQReader;
+#endif
}
+ else
+ {
+ fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str());
+ ret = -1;
+ goto END_MAIN;
+ }
+
if (useFileOutput) {
// Opening COFDM output file
- if (outputName != NULL) {
- fprintf(stderr, "Using file output\n");
- output = new OutputFile(outputName);
- }
+ fprintf(stderr, "Using file output '%s'\n", outputName.c_str());
+ output = new OutputFile(outputName);
}
else if (useUHDOutput) {
fprintf(stderr, "Using UHD output\n");
@@ -553,7 +586,6 @@ int main(int argc, char* argv[])
logger.level(error) << "UHD initialisation failed:" << e.what();
goto END_MAIN;
}
-
}
flowgraph = new Flowgraph();
@@ -568,7 +600,7 @@ int main(int argc, char* argv[])
((OutputUHD*)output)->setETIReader(modulator->getEtiReader());
}
- inputFileReader.PrintInfo();
+ inputReader->PrintInfo();
try {
while (running) {
@@ -578,7 +610,7 @@ int main(int argc, char* argv[])
PDEBUG("*****************************************\n");
PDEBUG("* Starting main loop\n");
PDEBUG("*****************************************\n");
- while ((framesize = inputFileReader.GetNextFrame(data.getData())) > 0) {
+ while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) {
if (!running) {
break;
}
@@ -588,7 +620,6 @@ int main(int argc, char* argv[])
PDEBUG("*****************************************\n");
PDEBUG("* Read frame %lu\n", frame);
PDEBUG("*****************************************\n");
- fprintf(stderr, "Reading frame %lu\n", frame);
////////////////////////////////////////////////////////////////
// Proccessing data
@@ -597,17 +628,11 @@ int main(int argc, char* argv[])
}
if (framesize == 0) {
fprintf(stderr, "End of file reached.\n");
- if (!loop) {
- running = false;
- } else {
- fprintf(stderr, "Rewinding file.\n");
- inputFileReader.Rewind();
- }
}
else {
fprintf(stderr, "Input read error.\n");
- running = false;
}
+ running = false;
}
} catch (std::exception& e) {
fprintf(stderr, "EXCEPTION: %s\n", e.what());