aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2015-02-22 17:56:49 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2015-02-22 21:03:35 +0100
commit1ada0901a8fa687576fa4953044fd43bc6c06f8a (patch)
tree923be8c22fc54a25011f84a966b896f850990c69 /src
parent7cee56f37001640b88f4ac1249624c9c9758e844 (diff)
downloaddabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.gz
dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.bz2
dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.zip
Move main flowgraph to distinct function
Diffstat (limited to 'src')
-rw-r--r--src/DabMod.cpp193
-rw-r--r--src/InputReader.h2
-rw-r--r--src/InputZeroMQReader.cpp11
3 files changed, 141 insertions, 65 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 1f6eedf..8178a75 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -85,6 +85,30 @@ void signalHandler(int signalNb)
running = 0;
}
+struct modulator_data
+{
+ modulator_data() :
+ inputReader(NULL),
+ framecount(0),
+ flowgraph(NULL),
+ rcs(NULL) {}
+
+ InputReader* inputReader;
+ Buffer data;
+ uint64_t framecount;
+
+ Flowgraph* flowgraph;
+ RemoteControllers* rcs;
+};
+
+enum run_modulator_state {
+ MOD_FAILURE,
+ MOD_NORMAL_END,
+ MOD_AGAIN
+};
+
+run_modulator_state run_modulator(modulator_data& m);
+
int main(int argc, char* argv[])
{
int ret = 0;
@@ -99,14 +123,12 @@ int main(int argc, char* argv[])
std::string fileOutputFormat = "complexf";
int useUHDOutput = 0;
- uint64_t frame = 0;
size_t outputRate = 2048000;
size_t clockRate = 0;
unsigned dabMode = 0;
float digitalgain = 1.0f;
float normalise = 1.0f;
GainMode gainMode = GAIN_VAR;
- Buffer data;
/* UHD requires the input I and Q samples to be in the interval
@@ -130,6 +152,8 @@ int main(int argc, char* argv[])
OutputUHDConfig outputuhd_conf;
#endif
+ modulator_data m;
+
// To handle the timestamp offset of the modulator
struct modulator_offset_config modconf;
modconf.use_offset_file = false;
@@ -141,13 +165,15 @@ int main(int argc, char* argv[])
shared_ptr<ModOutput> output;
RemoteControllers rcs;
+ m.rcs = &rcs;
+
+ bool run_again = true;
Logger logger;
InputFileReader inputFileReader(logger);
#if defined(HAVE_ZEROMQ)
InputZeroMQReader inputZeroMQReader(logger);
#endif
- InputReader* inputReader;
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
@@ -673,7 +699,7 @@ int main(int argc, char* argv[])
throw std::runtime_error("Unable to open input");
}
- inputReader = &inputFileReader;
+ m.inputReader = &inputFileReader;
}
else if (inputTransport == "zeromq") {
#if !defined(HAVE_ZEROMQ)
@@ -688,7 +714,7 @@ int main(int argc, char* argv[])
else {
inputZeroMQReader.Open(inputName, inputMaxFramesQueued);
}
- inputReader = &inputZeroMQReader;
+ m.inputReader = &inputZeroMQReader;
#endif
}
else
@@ -698,59 +724,103 @@ int main(int argc, char* argv[])
throw std::runtime_error("Unable to open input");
}
- if (useFileOutput) {
- if (fileOutputFormat == "complexf") {
- output = shared_ptr<OutputFile>(new OutputFile(outputName));
- }
- else if (fileOutputFormat == "s8") {
- // We must normalise the samples to the interval [-127.0; 127.0]
- normalise = 127.0f / normalise_factor;
+ while (run_again) {
+ Flowgraph flowgraph;
- format_converter = shared_ptr<FormatConverter>(new FormatConverter());
+ if (useFileOutput) {
+ if (fileOutputFormat == "complexf") {
+ output = shared_ptr<OutputFile>(new OutputFile(outputName));
+ }
+ else if (fileOutputFormat == "s8") {
+ // We must normalise the samples to the interval [-127.0; 127.0]
+ normalise = 127.0f / normalise_factor;
- output = shared_ptr<OutputFile>(new OutputFile(outputName));
+ format_converter = shared_ptr<FormatConverter>(new FormatConverter());
+
+ output = shared_ptr<OutputFile>(new OutputFile(outputName));
+ }
}
- }
#if defined(HAVE_OUTPUT_UHD)
- else if (useUHDOutput) {
-
- normalise = 1.0f / normalise_factor;
-
- outputuhd_conf.sampleRate = outputRate;
- output = shared_ptr<OutputUHD>(new OutputUHD(outputuhd_conf, logger));
- ((OutputUHD*)output.get())->enrol_at(rcs);
- }
+ else if (useUHDOutput) {
+ normalise = 1.0f / normalise_factor;
+ outputuhd_conf.sampleRate = outputRate;
+ output = shared_ptr<OutputUHD>(new OutputUHD(outputuhd_conf, logger));
+ ((OutputUHD*)output.get())->enrol_at(rcs);
+ }
#endif
#if defined(HAVE_ZEROMQ)
- else if (useZeroMQOutput) {
- /* We normalise the same way as for the UHD output */
- normalise = 1.0f / normalise_factor;
-
- output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName));
- }
+ else if (useZeroMQOutput) {
+ /* We normalise the same way as for the UHD output */
+ normalise = 1.0f / normalise_factor;
+ output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName));
+ }
#endif
- data.setLength(6144);
- shared_ptr<InputMemory> input(new InputMemory(&data));
- shared_ptr<DabModulator> modulator(new DabModulator(modconf, &rcs, logger, outputRate, clockRate,
- dabMode, gainMode, digitalgain, normalise, filterTapsFilename));
- flowgraph->connect(input, modulator);
- if (format_converter) {
- flowgraph->connect(modulator, format_converter);
- flowgraph->connect(format_converter, output);
- }
- else {
- flowgraph->connect(modulator, output);
- }
+ m.flowgraph = &flowgraph;
+ m.data.setLength(6144);
+
+ shared_ptr<InputMemory> input(new InputMemory(&m.data));
+ shared_ptr<DabModulator> modulator(
+ new DabModulator(modconf, &rcs, logger, outputRate, clockRate,
+ dabMode, gainMode, digitalgain, normalise, filterTapsFilename));
+
+ flowgraph.connect(input, modulator);
+ if (format_converter) {
+ flowgraph.connect(modulator, format_converter);
+ flowgraph.connect(format_converter, output);
+ }
+ else {
+ flowgraph.connect(modulator, output);
+ }
#if defined(HAVE_OUTPUT_UHD)
- if (useUHDOutput) {
- ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader());
- }
+ if (useUHDOutput) {
+ ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader());
+ }
#endif
- inputReader->PrintInfo();
+ m.inputReader->PrintInfo();
+
+ run_modulator_state st = run_modulator(m);
+
+ switch (st) {
+ case MOD_FAILURE:
+ fprintf(stderr, "\nModulator failure.\n");
+ run_again = false;
+ ret = 1;
+ break;
+ case MOD_NORMAL_END:
+ fprintf(stderr, "\nModulator stopped.\n");
+ ret = 0;
+ run_again = false;
+ break;
+ case MOD_AGAIN:
+ fprintf(stderr, "\nRestart modulator\n");
+ run_again = true;
+ running = true;
+ break;
+ }
+ fprintf(stderr, "\n\n");
+ fprintf(stderr, "%lu DAB frames encoded\n", m.framecount);
+ fprintf(stderr, "%f seconds encoded\n", (float)m.framecount * 0.024f);
+
+ fprintf(stderr, "\nCleaning flowgraph...\n");
+
+ m.data.setLength(0);
+ }
+
+ ////////////////////////////////////////////////////////////////////////
+ // Cleaning things
+ ////////////////////////////////////////////////////////////////////////
+
+ logger.level(info) << "Terminating";
+ return ret;
+}
+
+run_modulator_state run_modulator(modulator_data& m)
+{
+ run_modulator_state ret = MOD_FAILURE;
try {
while (running) {
@@ -759,26 +829,26 @@ int main(int argc, char* argv[])
PDEBUG("*****************************************\n");
PDEBUG("* Starting main loop\n");
PDEBUG("*****************************************\n");
- while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) {
+ while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) {
if (!running) {
break;
}
- frame++;
+ m.framecount++;
PDEBUG("*****************************************\n");
- PDEBUG("* Read frame %lu\n", frame);
+ PDEBUG("* Read frame %lu\n", m.framecount);
PDEBUG("*****************************************\n");
////////////////////////////////////////////////////////////////
- // Proccessing data
+ // Processing data
////////////////////////////////////////////////////////////////
- flowgraph->run();
+ m.flowgraph->run();
/* Check every once in a while if the remote control
* is still working */
- if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) {
- rcs.check_faults();
+ if (m.rcs->get_no_controllers() > 0 && (m.framecount % 250) == 0) {
+ m.rcs->check_faults();
}
}
if (framesize == 0) {
@@ -788,24 +858,17 @@ int main(int argc, char* argv[])
fprintf(stderr, "Input read error.\n");
}
running = 0;
+ ret = MOD_NORMAL_END;
}
+ } catch (std::overflow_error& e) {
+ // The ZeroMQ input has overflowed its buffer
+ fprintf(stderr, "overflow error: %s\n", e.what());
+ ret = MOD_AGAIN;
} catch (std::exception& e) {
fprintf(stderr, "EXCEPTION: %s\n", e.what());
- ret = -1;
+ ret = MOD_FAILURE;
}
- ////////////////////////////////////////////////////////////////////////
- // Cleaning things
- ////////////////////////////////////////////////////////////////////////
- fprintf(stderr, "\n\n");
- fprintf(stderr, "%lu DAB frames encoded\n", frame);
- fprintf(stderr, "%f seconds encoded\n", (float)frame * 0.024f);
-
- // Cif
- fprintf(stderr, "\nCleaning buffers...\n");
-
- logger.level(info) << "Terminating";
-
return ret;
}
diff --git a/src/InputReader.h b/src/InputReader.h
index 3e3e000..ee7d657 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -138,6 +138,8 @@ struct InputZeroMQThreadData
ThreadsafeQueue<uint8_t*> *in_messages;
std::string uri;
unsigned max_queued_frames;
+
+ bool running;
};
class InputZeroMQWorker
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 01d8720..5fab447 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -80,6 +80,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
uint8_t* incoming;
in_messages_.wait_and_pop(incoming);
+ if (! workerdata_.running) {
+ throw std::overflow_error("InputZeroMQ worker dead");
+ }
+
memcpy(buffer, incoming, framesize);
delete incoming;
@@ -174,6 +178,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
fprintf(stderr, "ZeroMQ buffer overfull !\n");
buffer_full = true;
+ throw std::runtime_error("ZMQ input full");
}
queue_size = workerdata->in_messages->size();
@@ -195,15 +200,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
catch (zmq::error_t& err) {
fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());
}
+ catch (std::exception& err) {
+ }
fprintf(stderr, "ZeroMQ input worker terminated\n");
subscriber.close();
+
+ workerdata->running = false;
+ workerdata->in_messages->notify();
}
void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
{
running = true;
+ workerdata->running = true;
recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);
}