summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/InputReader.h62
-rw-r--r--src/InputZeroMQReader.cpp86
2 files changed, 58 insertions, 90 deletions
diff --git a/src/InputReader.h b/src/InputReader.h
index c897c2d..07326cf 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -162,65 +162,37 @@ struct zmq_input_overflow : public std::exception
#if defined(HAVE_ZEROMQ)
/* A ZeroMQ input. See www.zeromq.org for more info */
-struct InputZeroMQThreadData
-{
- ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > *in_messages;
- std::string uri;
- size_t max_queued_frames;
-};
-
-class InputZeroMQWorker
+class InputZeroMQReader : public InputReader
{
public:
- void Start(struct InputZeroMQThreadData* workerdata);
- void Stop();
- bool is_running(void) const { return running; }
+ InputZeroMQReader() = default;
+ InputZeroMQReader(const InputZeroMQReader& other) = delete;
+ InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
+ ~InputZeroMQReader();
+
+ int Open(const std::string& uri, size_t max_queued_frames);
+ int GetNextFrame(void* buffer);
+ void PrintInfo() const;
private:
- std::atomic<bool> running = ATOMIC_VAR_INIT(false);
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
+ std::string m_uri;
+ size_t m_max_queued_frames = 0;
+ ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > m_in_messages;
- void RecvProcess(struct InputZeroMQThreadData* workerdata);
+ void RecvProcess(void);
- zmq::context_t zmqcontext; // is thread-safe
- boost::thread recv_thread;
+ zmq::context_t m_zmqcontext; // is thread-safe
+ boost::thread m_recv_thread;
/* We must be careful to keep frame phase consistent. If we
* drop a single ETI frame, we will break the transmission
* frame vs. ETI frame phase.
*
- * Here we keep track of how many ETI frames we must drop
+ * Here we keep track of how many ETI frames we must drop.
*/
int m_to_drop = 0;
};
-class InputZeroMQReader : public InputReader
-{
- public:
- InputZeroMQReader()
- {
- workerdata_.in_messages = &in_messages_;
- }
-
- ~InputZeroMQReader()
- {
- worker_.Stop();
- }
-
- int Open(const std::string& uri, size_t max_queued_frames);
-
- int GetNextFrame(void* buffer);
-
- void PrintInfo() const;
-
- private:
- InputZeroMQReader(const InputZeroMQReader& other) = delete;
- InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
- std::string uri_;
-
- InputZeroMQWorker worker_;
- ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > in_messages_;
- struct InputZeroMQThreadData workerdata_;
-};
-
#endif
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 5d0e513..aa342d5 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -68,20 +68,28 @@ struct zmq_dab_message_t
#define ZMQ_DAB_MESSAGE_T_HEADERSIZE \
(sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t))
+InputZeroMQReader::~InputZeroMQReader()
+{
+ m_running = false;
+ m_zmqcontext.close();
+ if (m_recv_thread.joinable()) {
+ m_recv_thread.join();
+ }
+}
+
int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
{
// The URL might start with zmq+tcp://
if (uri.substr(0, 4) == "zmq+") {
- uri_ = uri.substr(4);
+ m_uri = uri.substr(4);
}
else {
- uri_ = uri;
+ m_uri = uri;
}
- workerdata_.uri = uri_;
- workerdata_.max_queued_frames = max_queued_frames;
- // launch receiver thread
- worker_.Start(&workerdata_);
+ m_max_queued_frames = max_queued_frames;
+
+ m_recv_thread = boost::thread(&InputZeroMQReader::RecvProcess, this);
return 0;
}
@@ -90,7 +98,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
{
const size_t framesize = 6144;
- if (not worker_.is_running()) {
+ if (not m_running) {
return 0;
}
@@ -100,18 +108,18 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
* (4 ETI frames in TM1) and we should make sure that
* we can serve the data required for a full transmission frame.
*/
- if (in_messages_.size() < 4) {
+ if (m_in_messages.size() < 4) {
const size_t prebuffering = 10;
etiLog.log(trace, "ZMQ,wait1");
- in_messages_.wait_and_pop(incoming, prebuffering);
+ m_in_messages.wait_and_pop(incoming, prebuffering);
}
else {
etiLog.log(trace, "ZMQ,wait2");
- in_messages_.wait_and_pop(incoming);
+ m_in_messages.wait_and_pop(incoming);
}
etiLog.log(trace, "ZMQ,pop");
- if (not worker_.is_running()) {
+ if (not m_running) {
throw zmq_input_overflow();
}
@@ -123,54 +131,55 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
void InputZeroMQReader::PrintInfo() const
{
fprintf(stderr, "Input ZeroMQ:\n");
- fprintf(stderr, " Receiving from %s\n\n", uri_.c_str());
+ fprintf(stderr, " Receiving from %s\n\n", m_uri.c_str());
}
-// ------------- Worker functions
-
-void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
+void InputZeroMQReader::RecvProcess()
{
set_thread_name("zmqinput");
- size_t queue_size = 0;
+ m_running = true;
+ size_t queue_size = 0;
bool buffer_full = false;
- zmq::socket_t subscriber(zmqcontext, ZMQ_SUB);
+ zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB);
// zmq sockets are not thread safe. That's why
// we create it here, and not at object creation.
bool success = true;
try {
- subscriber.connect(workerdata->uri.c_str());
+ subscriber.connect(m_uri.c_str());
}
catch (zmq::error_t& err) {
- etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << workerdata->uri << "': '" << err.what() << "'";
+ etiLog.level(error) << "Failed to connect ZeroMQ socket to '" <<
+ m_uri << "': '" << err.what() << "'";
success = false;
}
if (success) try {
- subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+ // subscribe to all messages
+ subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
}
catch (zmq::error_t& err) {
- etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'";
+ etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" <<
+ err.what() << "'";
success = false;
}
if (success) try {
- while (running)
- {
+ while (m_running) {
zmq::message_t incoming;
subscriber.recv(&incoming);
if (m_to_drop) {
- queue_size = workerdata->in_messages->size();
+ queue_size = m_in_messages.size();
if (queue_size > 4) {
- workerdata->in_messages->notify();
+ m_in_messages.notify();
}
m_to_drop--;
}
- else if (queue_size < workerdata->max_queued_frames) {
+ else if (queue_size < m_max_queued_frames) {
if (buffer_full) {
etiLog.level(info) << "ZeroMQ buffer recovered: " <<
queue_size << " elements";
@@ -214,14 +223,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
offset += framesize;
- queue_size = workerdata->in_messages->push(buf);
+ queue_size = m_in_messages.push(buf);
etiLog.log(trace, "ZMQ,push %zu", queue_size);
}
}
}
}
else {
- workerdata->in_messages->notify();
+ m_in_messages.notify();
if (!buffer_full) {
etiLog.level(warn) << "ZeroMQ buffer overfull !";
@@ -230,7 +239,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
throw runtime_error("ZMQ input full");
}
- queue_size = workerdata->in_messages->size();
+ queue_size = m_in_messages.size();
/* Drop three more incoming ETI frames before
* we start accepting them again, to guarantee
@@ -256,21 +265,8 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
subscriber.close();
- running = false;
- workerdata->in_messages->notify();
-}
-
-void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
-{
- running = true;
- recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);
-}
-
-void InputZeroMQWorker::Stop()
-{
- running = false;
- zmqcontext.close();
- recv_thread.join();
+ m_running = false;
+ m_in_messages.notify();
}
#endif