From 5d965e80be2e6ab62bc82fb2e0d4d472153ad241 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli (think)" Date: Sun, 10 Nov 2013 21:50:12 +0100 Subject: crc-dabmod: add ZeroMQ input module --- src/DabMod.cpp | 79 ++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 27 deletions(-) (limited to 'src/DabMod.cpp') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 64e557c..1e8937d 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -153,8 +153,9 @@ int main(int argc, char* argv[]) int ret = 0; bool loop = false; std::string inputName = ""; + std::string inputTransport = "file"; - const char* outputName; + std::string outputName; int useFileOutput = 0; int useUHDOutput = 0; @@ -190,6 +191,10 @@ int main(int argc, char* argv[]) Logger logger; InputFileReader inputFileReader(logger); +#if defined(HAVE_INPUT_ZEROMQ) + InputZeroMQReader inputZeroMQReader(logger); +#endif + InputReader* inputReader; signal(SIGINT, signalHandler); @@ -206,7 +211,7 @@ int main(int argc, char* argv[]) if (c != 'C') { use_configuration_cmdline = true; } - + switch (c) { case 'C': use_configuration_file = true; @@ -335,7 +340,8 @@ int main(int argc, char* argv[]) loop = true; } - inputName = pt.get("input.filename", "/dev/stdin"); + inputTransport = pt.get("input.transport", "file"); + inputName = pt.get("input.source", "/dev/stdin"); // log parameters: if (pt.get("log.syslog", 0) == 1) { @@ -391,7 +397,7 @@ int main(int argc, char* argv[]) if (output_selected == "file") { try { - outputName = pt.get("fileoutput.filename").c_str(); + outputName = pt.get("fileoutput.filename"); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; @@ -477,15 +483,22 @@ int main(int argc, char* argv[]) modconf.delay_calculation_pipeline_stages += FIRFILTER_PIPELINE_DELAY; } - // Setting ETI input filename if (inputName == "") { if (optind < argc) { inputName = argv[optind++]; - } else { + + if (inputName.substr(0, 4) == "zmq+" && + inputName.find("://") != std::string::npos) { + // if the name starts with zmq+XYZ://somewhere:port + inputTransport = "zeromq"; + } + } + else { inputName = "/dev/stdin"; } } + // Checking unused arguments if (optind != argc) { fprintf(stderr, "Invalid arguments:"); @@ -507,13 +520,14 @@ int main(int argc, char* argv[]) // Print settings fprintf(stderr, "Input\n"); - fprintf(stderr, " Name: %s\n", inputName.c_str()); + fprintf(stderr, " Type: %s\n", inputTransport.c_str()); + fprintf(stderr, " Source: %s\n", inputName.c_str()); fprintf(stderr, "Output\n"); if (useUHDOutput) { fprintf(stderr, " UHD, Device: %s\n", outputuhd_conf.device); } else if (useFileOutput) { - fprintf(stderr, " Name: %s\n", outputName); + fprintf(stderr, " Name: %s\n", outputName.c_str()); } fprintf(stderr, " Sampling rate: "); if (outputRate > 1000) { @@ -526,20 +540,39 @@ int main(int argc, char* argv[]) fprintf(stderr, "%zu Hz\n", outputRate); } - // Opening ETI input file - if (inputFileReader.Open(inputName) == -1) { - fprintf(stderr, "Unable to open input file!\n"); - logger.level(error) << "Unable to open input file!"; + if (inputTransport == "file") { + // Opening ETI input file + if (inputFileReader.Open(inputName, loop) == -1) { + fprintf(stderr, "Unable to open input file!\n"); + logger.level(error) << "Unable to open input file!"; + ret = -1; + goto END_MAIN; + } + + inputReader = &inputFileReader; + } + else if (inputTransport == "zeromq") { +#if !defined(HAVE_INPUT_ZEROMQ) + fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n"); ret = -1; goto END_MAIN; +#else + inputZeroMQReader.Open(inputName); + inputReader = &inputZeroMQReader; +#endif } + else + { + fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str()); + ret = -1; + goto END_MAIN; + } + if (useFileOutput) { // Opening COFDM output file - if (outputName != NULL) { - fprintf(stderr, "Using file output\n"); - output = new OutputFile(outputName); - } + fprintf(stderr, "Using file output '%s'\n", outputName.c_str()); + output = new OutputFile(outputName); } else if (useUHDOutput) { fprintf(stderr, "Using UHD output\n"); @@ -553,7 +586,6 @@ int main(int argc, char* argv[]) logger.level(error) << "UHD initialisation failed:" << e.what(); goto END_MAIN; } - } flowgraph = new Flowgraph(); @@ -568,7 +600,7 @@ int main(int argc, char* argv[]) ((OutputUHD*)output)->setETIReader(modulator->getEtiReader()); } - inputFileReader.PrintInfo(); + inputReader->PrintInfo(); try { while (running) { @@ -578,7 +610,7 @@ int main(int argc, char* argv[]) PDEBUG("*****************************************\n"); PDEBUG("* Starting main loop\n"); PDEBUG("*****************************************\n"); - while ((framesize = inputFileReader.GetNextFrame(data.getData())) > 0) { + while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) { if (!running) { break; } @@ -588,7 +620,6 @@ int main(int argc, char* argv[]) PDEBUG("*****************************************\n"); PDEBUG("* Read frame %lu\n", frame); PDEBUG("*****************************************\n"); - fprintf(stderr, "Reading frame %lu\n", frame); //////////////////////////////////////////////////////////////// // Proccessing data @@ -597,17 +628,11 @@ int main(int argc, char* argv[]) } if (framesize == 0) { fprintf(stderr, "End of file reached.\n"); - if (!loop) { - running = false; - } else { - fprintf(stderr, "Rewinding file.\n"); - inputFileReader.Rewind(); - } } else { fprintf(stderr, "Input read error.\n"); - running = false; } + running = false; } } catch (std::exception& e) { fprintf(stderr, "EXCEPTION: %s\n", e.what()); -- cgit v1.2.3