aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/UdpSocket.cpp62
-rw-r--r--lib/UdpSocket.h29
-rw-r--r--src/DabMod.cpp9
-rw-r--r--src/EtiReader.cpp38
-rw-r--r--src/EtiReader.h35
-rw-r--r--src/InputReader.h1
-rw-r--r--src/Utils.cpp3
7 files changed, 134 insertions, 43 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp
index 981d713..b88c731 100644
--- a/lib/UdpSocket.cpp
+++ b/lib/UdpSocket.cpp
@@ -167,8 +167,7 @@ int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
/**
* Must be called to receive data on a multicast address.
- * @param groupname The multica
-st address to join.
+ * @param groupname The multicast address to join.
* @return 0 if ok, -1 if error
*/
int UdpSocket::joinGroup(char* groupname)
@@ -254,3 +253,62 @@ InetAddress UdpPacket::getAddress()
return address;
}
+UdpReceiver::~UdpReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UdpReceiver::start(int port) {
+ m_port = port;
+ m_thread = std::thread(&UdpReceiver::m_run, this);
+}
+
+std::vector<uint8_t> UdpReceiver::get_packet_buffer()
+{
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
+ }
+
+ UdpPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.getBuffer();
+}
+
+void UdpReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ m_sock.reinit(m_port, "0.0.0.0");
+
+ const size_t packsize = 8192;
+ UdpPacket packet(packsize);
+
+ while (not m_stop) {
+ int ret = m_sock.receive(packet);
+ if (ret == 0) {
+ if (packet.getSize() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ m_packets.push(packet);
+ }
+ else
+ {
+ if (inetErrNo != EINTR) {
+ // TODO replace fprintf
+ fprintf(stderr, "Socket error: %s\n", inetErrMsg);
+ }
+ m_stop = true;
+ }
+ }
+}
+
diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h
index f51e87c..81a7d2b 100644
--- a/lib/UdpSocket.h
+++ b/lib/UdpSocket.h
@@ -31,6 +31,7 @@
#endif
#include "InetAddress.h"
+#include "ThreadsafeQueue.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
@@ -45,6 +46,8 @@
#include <stdlib.h>
#include <iostream>
#include <vector>
+#include <thread>
+#include <atomic>
class UdpPacket;
@@ -172,3 +175,29 @@ class UdpPacket
InetAddress address;
};
+/* Threaded UDP receiver */
+class UdpReceiver {
+ public:
+ UdpReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {}
+ ~UdpReceiver();
+ UdpReceiver(const UdpReceiver&) = delete;
+ UdpReceiver operator=(const UdpReceiver&) = delete;
+
+ // Start the receiver in a separate thread
+ void start(int port);
+
+ // Get the data contained in a UDP packet, blocks if none available
+ // In case of error, throws a runtime_error
+ std::vector<uint8_t> get_packet_buffer(void);
+
+ private:
+ void m_run(void);
+
+ int m_port;
+ std::thread m_thread;
+ std::atomic<bool> m_stop;
+ ThreadsafeQueue<UdpPacket> m_packets;
+ UdpSocket m_sock;
+};
+
+
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 8065a5a..ac7842f 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -627,6 +627,9 @@ int launch_modulator(int argc, char* argv[])
else if (inputName.substr(0, 6) == "tcp://") {
inputTransport = "tcp";
}
+ else if (inputName.substr(0, 6) == "udp://") {
+ inputTransport = "edi";
+ }
}
else {
inputName = "/dev/stdin";
@@ -784,7 +787,11 @@ int launch_modulator(int argc, char* argv[])
}
set_thread_name("modulator");
- if (ediUdpInput.isEnabled()) {
+ if (inputTransport == "edi") {
+ if (not ediUdpInput.isEnabled()) {
+ etiLog.level(error) << "inputTransport is edi, but ediUdpInput is not enabled";
+ return -1;
+ }
Flowgraph flowgraph;
auto modulator = make_shared<DabModulator>(
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index cc7b004..a1053c6 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -508,49 +508,31 @@ EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) :
m_port(0),
m_decoder(decoder) { }
-int EdiUdpInput::Open(const std::string& uri)
+void EdiUdpInput::Open(const std::string& uri)
{
etiLog.level(info) << "Opening EDI :" << uri;
- int ret = 1;
-
const std::regex re_udp("udp://:([0-9]+)");
std::smatch m;
if (std::regex_match(uri, m, re_udp)) {
m_port = std::stoi(m[1].str());
etiLog.level(info) << "EDI port :" << m_port;
- ret = m_sock.reinit(m_port, "0.0.0.0");
- m_enabled = (ret == 0);
+ m_udp_rx.start(m_port);
+ m_enabled = true;
}
-
- return ret;
}
bool EdiUdpInput::rxPacket()
{
- const size_t packsize = 8192;
- UdpPacket packet(packsize);
-
- int ret = m_sock.receive(packet);
- if (ret == 0) {
- const auto &buf = packet.getBuffer();
- if (packet.getSize() == packsize) {
- etiLog.log(warn, "Warning, possible UDP truncation");
- }
-
- m_decoder.push_packet(buf);
+ try {
+ auto udp_data = m_udp_rx.get_packet_buffer();
+ m_decoder.push_packet(udp_data);
return true;
}
- else {
- if (inetErrNo == EINTR) {
- return false;
- }
- else {
- stringstream ss;
- ss << "EDI UDP Socket error: " << inetErrMsg;
- throw std::runtime_error(ss.str());
- }
+ catch (std::runtime_error& e) {
+ etiLog.level(warn) << "EDI input: " << e.what();
+ return false;
}
}
diff --git a/src/EtiReader.h b/src/EtiReader.h
index cd04a16..1b75025 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -38,29 +38,40 @@
#include "TimestampDecoder.h"
#include "lib/edi/ETIDecoder.hpp"
#include "lib/UdpSocket.h"
+#include "ThreadsafeQueue.h"
#include <vector>
#include <memory>
#include <stdint.h>
#include <sys/types.h>
-
+/* The modulator uses this interface to get the necessary multiplex data,
+ * either from an ETI or an EDI source.
+ */
class EtiSource
{
public:
+ /* Get the DAB Transmission Mode. Valid values: 1, 2, 3 or 4 */
virtual unsigned getMode() = 0;
+
+ /* Get the current Frame Phase */
virtual unsigned getFp() = 0;
+ /* Returns true if we have valid time stamps in the ETI*/
virtual bool sourceContainsTimestamp() = 0;
virtual void calculateTimestamp(struct frame_timestamp& ts) = 0;
+ /* Return the FIC source to be used for modulation */
virtual std::shared_ptr<FicSource>& getFic(void);
+
+ /* Return all subchannel sources containing MST data */
virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const = 0;
protected:
std::shared_ptr<FicSource> myFicSource;
};
+/* The EtiReader extracts the necessary data for modulation from an ETI(NI) byte stream. */
class EtiReader : public EtiSource
{
public:
@@ -72,25 +83,22 @@ public:
virtual unsigned getFp();
/* Read ETI data from dataIn. Returns the number of bytes
- * read from the buffer
+ * read from the buffer.
*/
int loadEtiData(const Buffer& dataIn);
+ virtual bool sourceContainsTimestamp();
virtual void calculateTimestamp(struct frame_timestamp& ts)
{
myTimestampDecoder.calculateTimestamp(ts);
}
- /* Returns true if we have valid time stamps in the ETI*/
- virtual bool sourceContainsTimestamp();
-
virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const;
private:
/* Transform the ETI TIST to a PPS offset in units of 1/16384000 s */
uint32_t getPPSOffset();
- void sync();
int state;
uint32_t nb_frames;
uint16_t framesize;
@@ -108,6 +116,9 @@ private:
std::vector<std::shared_ptr<SubchannelSource> > mySources;
};
+/* The EdiReader extracts the necessary data using the EDI input library in
+ * lib/edi
+ */
class EdiReader : public EtiSource, public EdiDecoder::DataCollector
{
public:
@@ -147,7 +158,7 @@ public:
virtual void add_subchannel(const EdiDecoder::eti_stc_data& stc);
- // Tell the ETIWriter that the AFPacket is complete
+ // Gets called by the EDI library to tell us that all data for a frame was given to us
virtual void assemble(void);
private:
bool m_proto_valid = false;
@@ -172,11 +183,15 @@ private:
std::map<uint8_t, std::shared_ptr<SubchannelSource> > m_sources;
};
+/* The EDI input does not use the inputs defined in InputReader.h, as they were designed
+ * for ETI. It uses the EdiUdpInput which in turn uses a threaded receiver.
+ */
+
class EdiUdpInput {
public:
EdiUdpInput(EdiDecoder::ETIDecoder& decoder);
- int Open(const std::string& uri);
+ void Open(const std::string& uri);
bool isEnabled(void) const { return m_enabled; }
@@ -190,7 +205,7 @@ class EdiUdpInput {
bool m_enabled;
int m_port;
- UdpSocket m_sock;
+ UdpReceiver m_udp_rx;
EdiDecoder::ETIDecoder& m_decoder;
};
diff --git a/src/InputReader.h b/src/InputReader.h
index f0e7197..7d6b373 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -40,7 +40,6 @@
#endif
#include "porting.h"
#include "Log.h"
-#include "lib/edi/ETIDecoder.hpp"
#include "lib/UdpSocket.h"
#include <sys/socket.h>
#include <netinet/in.h>
diff --git a/src/Utils.cpp b/src/Utils.cpp
index a91077c..b93f2c1 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2015
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -64,6 +64,7 @@ void printUsage(char* progName)
fprintf(out, "input: ETI input filename (default: stdin), or\n");
fprintf(out, " tcp://source:port for ETI-over-TCP input, or\n");
fprintf(out, " zmq+tcp://source:port for ZMQ input.\n");
+ fprintf(out, " udp://:port for EDI input.\n");
fprintf(out, "-f name: Use file output with given filename. (use /dev/stdout for standard output)\n");
fprintf(out, "-u device: Use UHD output with given device string. (use "" for default device)\n");
fprintf(out, "-F frequency: Set the transmit frequency when using UHD output. (mandatory option when using UHD)\n");