diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-22 17:56:49 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-02-22 21:03:35 +0100 | 
| commit | 1ada0901a8fa687576fa4953044fd43bc6c06f8a (patch) | |
| tree | 923be8c22fc54a25011f84a966b896f850990c69 /src | |
| parent | 7cee56f37001640b88f4ac1249624c9c9758e844 (diff) | |
| download | dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.gz dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.tar.bz2 dabmod-1ada0901a8fa687576fa4953044fd43bc6c06f8a.zip | |
Move main flowgraph to distinct function
Diffstat (limited to 'src')
| -rw-r--r-- | src/DabMod.cpp | 193 | ||||
| -rw-r--r-- | src/InputReader.h | 2 | ||||
| -rw-r--r-- | src/InputZeroMQReader.cpp | 11 | 
3 files changed, 141 insertions, 65 deletions
| diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 1f6eedf..8178a75 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -85,6 +85,30 @@ void signalHandler(int signalNb)      running = 0;  } +struct modulator_data +{ +    modulator_data() : +        inputReader(NULL), +        framecount(0), +        flowgraph(NULL), +        rcs(NULL) {} + +    InputReader* inputReader; +    Buffer data; +    uint64_t framecount; + +    Flowgraph* flowgraph; +    RemoteControllers* rcs; +}; + +enum run_modulator_state { +    MOD_FAILURE, +    MOD_NORMAL_END, +    MOD_AGAIN +}; + +run_modulator_state run_modulator(modulator_data& m); +  int main(int argc, char* argv[])  {      int ret = 0; @@ -99,14 +123,12 @@ int main(int argc, char* argv[])      std::string fileOutputFormat = "complexf";      int useUHDOutput = 0; -    uint64_t frame = 0;      size_t outputRate = 2048000;      size_t clockRate = 0;      unsigned dabMode = 0;      float digitalgain = 1.0f;      float normalise = 1.0f;      GainMode gainMode = GAIN_VAR; -    Buffer data;      /* UHD requires the input I and Q samples to be in the interval @@ -130,6 +152,8 @@ int main(int argc, char* argv[])      OutputUHDConfig outputuhd_conf;  #endif +    modulator_data m; +      // To handle the timestamp offset of the modulator      struct modulator_offset_config modconf;      modconf.use_offset_file = false; @@ -141,13 +165,15 @@ int main(int argc, char* argv[])      shared_ptr<ModOutput> output;      RemoteControllers rcs; +    m.rcs = &rcs; + +    bool run_again = true;      Logger logger;      InputFileReader inputFileReader(logger);  #if defined(HAVE_ZEROMQ)      InputZeroMQReader inputZeroMQReader(logger);  #endif -    InputReader* inputReader;      struct sigaction sa;      memset(&sa, 0, sizeof(struct sigaction)); @@ -673,7 +699,7 @@ int main(int argc, char* argv[])              throw std::runtime_error("Unable to open input");          } -        inputReader = &inputFileReader; +        m.inputReader = &inputFileReader;      }      else if (inputTransport == "zeromq") {  #if !defined(HAVE_ZEROMQ) @@ -688,7 +714,7 @@ int main(int argc, char* argv[])          else {              inputZeroMQReader.Open(inputName, inputMaxFramesQueued);          } -        inputReader = &inputZeroMQReader; +        m.inputReader = &inputZeroMQReader;  #endif      }      else @@ -698,59 +724,103 @@ int main(int argc, char* argv[])          throw std::runtime_error("Unable to open input");      } -    if (useFileOutput) { -        if (fileOutputFormat == "complexf") { -            output = shared_ptr<OutputFile>(new OutputFile(outputName)); -        } -        else if (fileOutputFormat == "s8") { -            // We must normalise the samples to the interval [-127.0; 127.0] -            normalise = 127.0f / normalise_factor; +    while (run_again) { +        Flowgraph flowgraph; -            format_converter = shared_ptr<FormatConverter>(new FormatConverter()); +        if (useFileOutput) { +            if (fileOutputFormat == "complexf") { +                output = shared_ptr<OutputFile>(new OutputFile(outputName)); +            } +            else if (fileOutputFormat == "s8") { +                // We must normalise the samples to the interval [-127.0; 127.0] +                normalise = 127.0f / normalise_factor; -            output = shared_ptr<OutputFile>(new OutputFile(outputName)); +                format_converter = shared_ptr<FormatConverter>(new FormatConverter()); + +                output = shared_ptr<OutputFile>(new OutputFile(outputName)); +            }          } -    }  #if defined(HAVE_OUTPUT_UHD) -    else if (useUHDOutput) { - -        normalise = 1.0f / normalise_factor; - -        outputuhd_conf.sampleRate = outputRate; -        output = shared_ptr<OutputUHD>(new OutputUHD(outputuhd_conf, logger)); -        ((OutputUHD*)output.get())->enrol_at(rcs); -    } +        else if (useUHDOutput) { +            normalise = 1.0f / normalise_factor; +            outputuhd_conf.sampleRate = outputRate; +            output = shared_ptr<OutputUHD>(new OutputUHD(outputuhd_conf, logger)); +            ((OutputUHD*)output.get())->enrol_at(rcs); +        }  #endif  #if defined(HAVE_ZEROMQ) -    else if (useZeroMQOutput) { -        /* We normalise the same way as for the UHD output */ -        normalise = 1.0f / normalise_factor; - -        output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName)); -    } +        else if (useZeroMQOutput) { +            /* We normalise the same way as for the UHD output */ +            normalise = 1.0f / normalise_factor; +            output = shared_ptr<OutputZeroMQ>(new OutputZeroMQ(outputName)); +        }  #endif -    data.setLength(6144); -    shared_ptr<InputMemory> input(new InputMemory(&data)); -    shared_ptr<DabModulator> modulator(new DabModulator(modconf, &rcs, logger, outputRate, clockRate, -            dabMode, gainMode, digitalgain, normalise, filterTapsFilename)); -    flowgraph->connect(input, modulator); -    if (format_converter) { -        flowgraph->connect(modulator, format_converter); -        flowgraph->connect(format_converter, output); -    } -    else { -        flowgraph->connect(modulator, output); -    } +        m.flowgraph = &flowgraph; +        m.data.setLength(6144); + +        shared_ptr<InputMemory> input(new InputMemory(&m.data)); +        shared_ptr<DabModulator> modulator( +                new DabModulator(modconf, &rcs, logger, outputRate, clockRate, +                    dabMode, gainMode, digitalgain, normalise, filterTapsFilename)); + +        flowgraph.connect(input, modulator); +        if (format_converter) { +            flowgraph.connect(modulator, format_converter); +            flowgraph.connect(format_converter, output); +        } +        else { +            flowgraph.connect(modulator, output); +        }  #if defined(HAVE_OUTPUT_UHD) -    if (useUHDOutput) { -        ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader()); -    } +        if (useUHDOutput) { +            ((OutputUHD*)output.get())->setETIReader(modulator->getEtiReader()); +        }  #endif -    inputReader->PrintInfo(); +        m.inputReader->PrintInfo(); + +        run_modulator_state st = run_modulator(m); + +        switch (st) { +            case MOD_FAILURE: +                fprintf(stderr, "\nModulator failure.\n"); +                run_again = false; +                ret = 1; +                break; +            case MOD_NORMAL_END: +                fprintf(stderr, "\nModulator stopped.\n"); +                ret = 0; +                run_again = false; +                break; +            case MOD_AGAIN: +                fprintf(stderr, "\nRestart modulator\n"); +                run_again = true; +                running = true; +                break; +        } +        fprintf(stderr, "\n\n"); +        fprintf(stderr, "%lu DAB frames encoded\n", m.framecount); +        fprintf(stderr, "%f seconds encoded\n", (float)m.framecount * 0.024f); + +        fprintf(stderr, "\nCleaning flowgraph...\n"); + +        m.data.setLength(0); +    } + +    //////////////////////////////////////////////////////////////////////// +    // Cleaning things +    //////////////////////////////////////////////////////////////////////// + +    logger.level(info) << "Terminating"; +    return ret; +} + +run_modulator_state run_modulator(modulator_data& m) +{ +    run_modulator_state ret = MOD_FAILURE;      try {          while (running) { @@ -759,26 +829,26 @@ int main(int argc, char* argv[])              PDEBUG("*****************************************\n");              PDEBUG("* Starting main loop\n");              PDEBUG("*****************************************\n"); -            while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) { +            while ((framesize = m.inputReader->GetNextFrame(m.data.getData())) > 0) {                  if (!running) {                      break;                  } -                frame++; +                m.framecount++;                  PDEBUG("*****************************************\n"); -                PDEBUG("* Read frame %lu\n", frame); +                PDEBUG("* Read frame %lu\n", m.framecount);                  PDEBUG("*****************************************\n");                  //////////////////////////////////////////////////////////////// -                // Proccessing data +                // Processing data                  //////////////////////////////////////////////////////////////// -                flowgraph->run(); +                m.flowgraph->run();                  /* Check every once in a while if the remote control                   * is still working */ -                if (rcs.get_no_controllers() > 0 && (frame % 250) == 0) { -                    rcs.check_faults(); +                if (m.rcs->get_no_controllers() > 0 && (m.framecount % 250) == 0) { +                    m.rcs->check_faults();                  }              }              if (framesize == 0) { @@ -788,24 +858,17 @@ int main(int argc, char* argv[])                  fprintf(stderr, "Input read error.\n");              }              running = 0; +            ret = MOD_NORMAL_END;          } +    } catch (std::overflow_error& e) { +        // The ZeroMQ input has overflowed its buffer +        fprintf(stderr, "overflow error: %s\n", e.what()); +        ret = MOD_AGAIN;      } catch (std::exception& e) {          fprintf(stderr, "EXCEPTION: %s\n", e.what()); -        ret = -1; +        ret = MOD_FAILURE;      } -    //////////////////////////////////////////////////////////////////////// -    // Cleaning things -    //////////////////////////////////////////////////////////////////////// -    fprintf(stderr, "\n\n"); -    fprintf(stderr, "%lu DAB frames encoded\n", frame); -    fprintf(stderr, "%f seconds encoded\n", (float)frame * 0.024f); - -    // Cif -    fprintf(stderr, "\nCleaning buffers...\n"); - -    logger.level(info) << "Terminating"; -      return ret;  } diff --git a/src/InputReader.h b/src/InputReader.h index 3e3e000..ee7d657 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -138,6 +138,8 @@ struct InputZeroMQThreadData      ThreadsafeQueue<uint8_t*> *in_messages;      std::string uri;      unsigned max_queued_frames; + +    bool running;  };  class InputZeroMQWorker diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 01d8720..5fab447 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -80,6 +80,10 @@ int InputZeroMQReader::GetNextFrame(void* buffer)      uint8_t* incoming;      in_messages_.wait_and_pop(incoming); +    if (! workerdata_.running) { +        throw std::overflow_error("InputZeroMQ worker dead"); +    } +      memcpy(buffer, incoming, framesize);      delete incoming; @@ -174,6 +178,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)                      fprintf(stderr, "ZeroMQ buffer overfull !\n");                      buffer_full = true; +                    throw std::runtime_error("ZMQ input full");                  }                  queue_size = workerdata->in_messages->size(); @@ -195,15 +200,21 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)      catch (zmq::error_t& err) {          fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());      } +    catch (std::exception& err) { +    }      fprintf(stderr, "ZeroMQ input worker terminated\n");      subscriber.close(); + +    workerdata->running = false; +    workerdata->in_messages->notify();  }  void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)  {      running = true; +    workerdata->running = true;      recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);  } | 
