aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-13 11:53:15 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-13 11:53:15 +0100
commitea5594186bafa5489d6086a26d71b8f3d1ccf9cd (patch)
treea307b0882a867b415c68cd7d644241abe0c971e1 /src
parentf908d28e72887b68391a246ceb328cb52dcb2aaa (diff)
downloaddabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.gz
dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.bz2
dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.zip
Add threaded UDP input for EDI
Diffstat (limited to 'src')
-rw-r--r--src/DabMod.cpp9
-rw-r--r--src/EtiReader.cpp38
-rw-r--r--src/EtiReader.h35
-rw-r--r--src/InputReader.h1
-rw-r--r--src/Utils.cpp3
5 files changed, 45 insertions, 41 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 8065a5a..ac7842f 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -627,6 +627,9 @@ int launch_modulator(int argc, char* argv[])
else if (inputName.substr(0, 6) == "tcp://") {
inputTransport = "tcp";
}
+ else if (inputName.substr(0, 6) == "udp://") {
+ inputTransport = "edi";
+ }
}
else {
inputName = "/dev/stdin";
@@ -784,7 +787,11 @@ int launch_modulator(int argc, char* argv[])
}
set_thread_name("modulator");
- if (ediUdpInput.isEnabled()) {
+ if (inputTransport == "edi") {
+ if (not ediUdpInput.isEnabled()) {
+ etiLog.level(error) << "inputTransport is edi, but ediUdpInput is not enabled";
+ return -1;
+ }
Flowgraph flowgraph;
auto modulator = make_shared<DabModulator>(
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index cc7b004..a1053c6 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -508,49 +508,31 @@ EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) :
m_port(0),
m_decoder(decoder) { }
-int EdiUdpInput::Open(const std::string& uri)
+void EdiUdpInput::Open(const std::string& uri)
{
etiLog.level(info) << "Opening EDI :" << uri;
- int ret = 1;
-
const std::regex re_udp("udp://:([0-9]+)");
std::smatch m;
if (std::regex_match(uri, m, re_udp)) {
m_port = std::stoi(m[1].str());
etiLog.level(info) << "EDI port :" << m_port;
- ret = m_sock.reinit(m_port, "0.0.0.0");
- m_enabled = (ret == 0);
+ m_udp_rx.start(m_port);
+ m_enabled = true;
}
-
- return ret;
}
bool EdiUdpInput::rxPacket()
{
- const size_t packsize = 8192;
- UdpPacket packet(packsize);
-
- int ret = m_sock.receive(packet);
- if (ret == 0) {
- const auto &buf = packet.getBuffer();
- if (packet.getSize() == packsize) {
- etiLog.log(warn, "Warning, possible UDP truncation");
- }
-
- m_decoder.push_packet(buf);
+ try {
+ auto udp_data = m_udp_rx.get_packet_buffer();
+ m_decoder.push_packet(udp_data);
return true;
}
- else {
- if (inetErrNo == EINTR) {
- return false;
- }
- else {
- stringstream ss;
- ss << "EDI UDP Socket error: " << inetErrMsg;
- throw std::runtime_error(ss.str());
- }
+ catch (std::runtime_error& e) {
+ etiLog.level(warn) << "EDI input: " << e.what();
+ return false;
}
}
diff --git a/src/EtiReader.h b/src/EtiReader.h
index cd04a16..1b75025 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -38,29 +38,40 @@
#include "TimestampDecoder.h"
#include "lib/edi/ETIDecoder.hpp"
#include "lib/UdpSocket.h"
+#include "ThreadsafeQueue.h"
#include <vector>
#include <memory>
#include <stdint.h>
#include <sys/types.h>
-
+/* The modulator uses this interface to get the necessary multiplex data,
+ * either from an ETI or an EDI source.
+ */
class EtiSource
{
public:
+ /* Get the DAB Transmission Mode. Valid values: 1, 2, 3 or 4 */
virtual unsigned getMode() = 0;
+
+ /* Get the current Frame Phase */
virtual unsigned getFp() = 0;
+ /* Returns true if we have valid time stamps in the ETI*/
virtual bool sourceContainsTimestamp() = 0;
virtual void calculateTimestamp(struct frame_timestamp& ts) = 0;
+ /* Return the FIC source to be used for modulation */
virtual std::shared_ptr<FicSource>& getFic(void);
+
+ /* Return all subchannel sources containing MST data */
virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const = 0;
protected:
std::shared_ptr<FicSource> myFicSource;
};
+/* The EtiReader extracts the necessary data for modulation from an ETI(NI) byte stream. */
class EtiReader : public EtiSource
{
public:
@@ -72,25 +83,22 @@ public:
virtual unsigned getFp();
/* Read ETI data from dataIn. Returns the number of bytes
- * read from the buffer
+ * read from the buffer.
*/
int loadEtiData(const Buffer& dataIn);
+ virtual bool sourceContainsTimestamp();
virtual void calculateTimestamp(struct frame_timestamp& ts)
{
myTimestampDecoder.calculateTimestamp(ts);
}
- /* Returns true if we have valid time stamps in the ETI*/
- virtual bool sourceContainsTimestamp();
-
virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const;
private:
/* Transform the ETI TIST to a PPS offset in units of 1/16384000 s */
uint32_t getPPSOffset();
- void sync();
int state;
uint32_t nb_frames;
uint16_t framesize;
@@ -108,6 +116,9 @@ private:
std::vector<std::shared_ptr<SubchannelSource> > mySources;
};
+/* The EdiReader extracts the necessary data using the EDI input library in
+ * lib/edi
+ */
class EdiReader : public EtiSource, public EdiDecoder::DataCollector
{
public:
@@ -147,7 +158,7 @@ public:
virtual void add_subchannel(const EdiDecoder::eti_stc_data& stc);
- // Tell the ETIWriter that the AFPacket is complete
+ // Gets called by the EDI library to tell us that all data for a frame was given to us
virtual void assemble(void);
private:
bool m_proto_valid = false;
@@ -172,11 +183,15 @@ private:
std::map<uint8_t, std::shared_ptr<SubchannelSource> > m_sources;
};
+/* The EDI input does not use the inputs defined in InputReader.h, as they were designed
+ * for ETI. It uses the EdiUdpInput which in turn uses a threaded receiver.
+ */
+
class EdiUdpInput {
public:
EdiUdpInput(EdiDecoder::ETIDecoder& decoder);
- int Open(const std::string& uri);
+ void Open(const std::string& uri);
bool isEnabled(void) const { return m_enabled; }
@@ -190,7 +205,7 @@ class EdiUdpInput {
bool m_enabled;
int m_port;
- UdpSocket m_sock;
+ UdpReceiver m_udp_rx;
EdiDecoder::ETIDecoder& m_decoder;
};
diff --git a/src/InputReader.h b/src/InputReader.h
index f0e7197..7d6b373 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -40,7 +40,6 @@
#endif
#include "porting.h"
#include "Log.h"
-#include "lib/edi/ETIDecoder.hpp"
#include "lib/UdpSocket.h"
#include <sys/socket.h>
#include <netinet/in.h>
diff --git a/src/Utils.cpp b/src/Utils.cpp
index a91077c..b93f2c1 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2015
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -64,6 +64,7 @@ void printUsage(char* progName)
fprintf(out, "input: ETI input filename (default: stdin), or\n");
fprintf(out, " tcp://source:port for ETI-over-TCP input, or\n");
fprintf(out, " zmq+tcp://source:port for ZMQ input.\n");
+ fprintf(out, " udp://:port for EDI input.\n");
fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n");
fprintf(out, "-u device: Use UHD output with given device string. (use "" for default device)\n");
fprintf(out, "-F frequency: Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n");