summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-06 17:08:17 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-06 17:08:17 +0100
commitf81473031810253f5b78fd02c2df04a5a204099b (patch)
tree720cb055951f1351412ac5a838d8b136bcca09a4
parent2dac8f5fa6d63a71a726ec373af9bf45f22de8b7 (diff)
downloaddabmod-f81473031810253f5b78fd02c2df04a5a204099b.tar.gz
dabmod-f81473031810253f5b78fd02c2df04a5a204099b.tar.bz2
dabmod-f81473031810253f5b78fd02c2df04a5a204099b.zip
Fix ETI-over-TCP and ZeroMQ inputs
-rw-r--r--src/DabMod.cpp21
-rw-r--r--src/EtiReader.cpp30
-rw-r--r--src/EtiReader.h6
3 files changed, 35 insertions, 22 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index a5c0de6..987b579 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -88,16 +88,18 @@ void signalHandler(int signalNb)
struct modulator_data
{
modulator_data() :
- inputReader(NULL),
+ inputReader(nullptr),
framecount(0),
- flowgraph(NULL),
- rcs(NULL) {}
+ flowgraph(nullptr),
+ etiReader(nullptr),
+ rcs(nullptr) {}
InputReader* inputReader;
Buffer data;
uint64_t framecount;
Flowgraph* flowgraph;
+ EtiReader* etiReader;
RemoteControllers* rcs;
};
@@ -691,7 +693,6 @@ int launch_modulator(int argc, char* argv[])
}
- EtiReader etiReader(tist_offset_s, tist_delay_stages);
EdiReader ediReader;
EdiDecoder::ETIDecoder ediInput(ediReader);
EdiUdpInput ediUdpInput(ediInput);
@@ -799,7 +800,7 @@ int launch_modulator(int argc, char* argv[])
#endif
size_t framecount = 0;
- bool running = true;
+
while (running) {
while (not ediReader.isFrameReady()) {
bool success = ediUdpInput.rxPacket();
@@ -826,6 +827,9 @@ int launch_modulator(int argc, char* argv[])
m.flowgraph = &flowgraph;
m.data.setLength(6144);
+ EtiReader etiReader(tist_offset_s, tist_delay_stages);
+ m.etiReader = &etiReader;
+
auto input = make_shared<InputMemory>(&m.data);
auto modulator = make_shared<DabModulator>(
etiReader, tiiConfig, outputRate, clockRate,
@@ -911,7 +915,6 @@ int launch_modulator(int argc, char* argv[])
run_modulator_state_t run_modulator(modulator_data& m)
{
-
auto ret = run_modulator_state_t::failure;
try {
while (running) {
@@ -932,6 +935,12 @@ run_modulator_state_t run_modulator(modulator_data& m)
PDEBUG("* Read frame %lu\n", m.framecount);
PDEBUG("*****************************************\n");
+ const int eti_bytes_read = m.etiReader->loadEtiData(m.data);
+ if ((size_t)eti_bytes_read != m.data.getLength()) {
+ etiLog.level(error) << "ETI frame incompletely read";
+ throw std::runtime_error("ETI read error");
+ }
+
m.flowgraph->run();
/* Check every once in a while if the remote control
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index e646392..cc7b004 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -94,18 +94,18 @@ const std::vector<std::shared_ptr<SubchannelSource> > EtiReader::getSubchannels(
}
-int EtiReader::process(const Buffer* dataIn)
+int EtiReader::loadEtiData(const Buffer& dataIn)
{
- PDEBUG("EtiReader::process(dataIn: %p)\n", dataIn);
+ PDEBUG("EtiReader::loadEtiData(dataIn: %p)\n", &dataIn);
PDEBUG(" state: %u\n", state);
- const unsigned char* in = reinterpret_cast<const unsigned char*>(dataIn->getData());
- size_t input_size = dataIn->getLength();
+ const unsigned char* in = reinterpret_cast<const unsigned char*>(dataIn.getData());
+ size_t input_size = dataIn.getLength();
while (input_size > 0) {
switch (state) {
case EtiReaderStateNbFrame:
if (input_size < 4) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
nb_frames = *(uint32_t*)in;
input_size -= 4;
@@ -115,7 +115,7 @@ int EtiReader::process(const Buffer* dataIn)
break;
case EtiReaderStateFrameSize:
if (input_size < 2) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
framesize = *(uint16_t*)in;
input_size -= 2;
@@ -125,7 +125,7 @@ int EtiReader::process(const Buffer* dataIn)
break;
case EtiReaderStateSync:
if (input_size < 4) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
framesize = 6144;
memcpy(&eti_sync, in, 4);
@@ -138,7 +138,7 @@ int EtiReader::process(const Buffer* dataIn)
break;
case EtiReaderStateFc:
if (input_size < 4) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
memcpy(&eti_fc, in, 4);
eti_fc_valid = true;
@@ -163,7 +163,7 @@ int EtiReader::process(const Buffer* dataIn)
break;
case EtiReaderStateNst:
if (input_size < 4 * (size_t)eti_fc.NST) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
if ((eti_stc.size() != eti_fc.NST) ||
(memcmp(&eti_stc[0], in, 4 * eti_fc.NST))) {
@@ -193,7 +193,7 @@ int EtiReader::process(const Buffer* dataIn)
break;
case EtiReaderStateEoh:
if (input_size < 4) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
memcpy(&eti_eoh, in, 4);
input_size -= 4;
@@ -206,7 +206,7 @@ int EtiReader::process(const Buffer* dataIn)
case EtiReaderStateFic:
if (eti_fc.MID == 3) {
if (input_size < 128) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
PDEBUG("Writting 128 bytes of FIC channel data\n");
Buffer fic = Buffer(128, in);
@@ -216,7 +216,7 @@ int EtiReader::process(const Buffer* dataIn)
in += 128;
} else {
if (input_size < 96) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
PDEBUG("Writting 96 bytes of FIC channel data\n");
Buffer fic = Buffer(96, in);
@@ -241,7 +241,7 @@ int EtiReader::process(const Buffer* dataIn)
break;
case EtiReaderStateEof:
if (input_size < 4) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
memcpy(&eti_eof, in, 4);
input_size -= 4;
@@ -253,7 +253,7 @@ int EtiReader::process(const Buffer* dataIn)
break;
case EtiReaderStateTist:
if (input_size < 4) {
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
memcpy(&eti_tist, in, 4);
input_size -= 4;
@@ -282,7 +282,7 @@ int EtiReader::process(const Buffer* dataIn)
myTimestampDecoder.updateTimestampEti(eti_fc.FP & 0x3,
eti_eoh.MNSC, getPPSOffset(), eti_fc.FCT);
- return dataIn->getLength() - input_size;
+ return dataIn.getLength() - input_size;
}
bool EtiReader::sourceContainsTimestamp()
diff --git a/src/EtiReader.h b/src/EtiReader.h
index 78f0d3d..cd04a16 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -70,7 +70,11 @@ public:
virtual unsigned getMode();
virtual unsigned getFp();
- int process(const Buffer* dataIn);
+
+ /* Read ETI data from dataIn. Returns the number of bytes
+ * read from the buffer
+ */
+ int loadEtiData(const Buffer& dataIn);
virtual void calculateTimestamp(struct frame_timestamp& ts)
{