/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
Copyrigth (C) 2013, 2014
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
This file is part of ODR-DabMod.
ODR-DabMod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMod. If not, see .
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#if defined(HAVE_INPUT_ZEROMQ)
#include
#include
#include
#include
#include "zmq.hpp"
#include
#include "porting.h"
#include "InputReader.h"
#include "PcDebug.h"
#define MAX_QUEUE_SIZE 50
int InputZeroMQReader::Open(std::string uri)
{
uri_ = uri;
workerdata_.uri = uri;
// launch receiver thread
worker_.Start(&workerdata_);
return 0;
}
int InputZeroMQReader::GetNextFrame(void* buffer)
{
zmq::message_t* incoming;
in_messages_.wait_and_pop(incoming);
size_t framesize = incoming->size();
// guarantee that we never will write more than 6144 bytes
if (framesize > 6144) {
fprintf(stderr, "ZeroMQ message too large: %zu!\n", framesize);
logger_.level(error) << "ZeroMQ message too large" << framesize;
return -1;
}
memcpy(buffer, incoming->data(), framesize);
delete incoming;
// pad to 6144 bytes
memset(&((uint8_t*)buffer)[framesize], 0x55, 6144 - framesize);
return 6144;
}
void InputZeroMQReader::PrintInfo()
{
fprintf(stderr, "Input ZeroMQ:\n");
fprintf(stderr, " Receiving from %s\n\n", uri_.c_str());
}
// ------------- Worker functions
void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
{
size_t queue_size = 0;
bool buffer_full = false;
zmq::socket_t subscriber(zmqcontext, ZMQ_SUB);
// zmq sockets are not thread safe. That's why
// we create it here, and not at object creation.
try {
subscriber.connect(workerdata->uri.c_str());
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
while (running)
{
zmq::message_t incoming;
subscriber.recv(&incoming);
if (m_to_drop) {
queue_size = workerdata->in_messages->size();
if (queue_size > 4) {
workerdata->in_messages->notify();
}
m_to_drop--;
}
else if (queue_size < MAX_QUEUE_SIZE) {
if (buffer_full) {
fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
queue_size);
buffer_full = false;
}
zmq::message_t* holder = new zmq::message_t();
holder->move(&incoming); // move the message into the holder
queue_size = workerdata->in_messages->push(holder);
}
else {
workerdata->in_messages->notify();
if (!buffer_full) {
fprintf(stderr, "ZeroMQ buffer overfull !\n");
buffer_full = true;
}
queue_size = workerdata->in_messages->size();
/* Drop three more incoming ETI frames before
* we start accepting them again, to guarantee
* that we keep transmission frame vs. ETI frame
* phase.
*/
m_to_drop = 3;
}
if (queue_size < 5) {
fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n",
queue_size);
}
}
}
catch (zmq::error_t& err) {
fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());
}
fprintf(stderr, "ZeroMQ input worker terminated\n");
subscriber.close();
}
void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
{
running = true;
recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);
}
void InputZeroMQWorker::Stop()
{
running = false;
zmqcontext.close();
recv_thread.join();
}
#endif