/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
Copyrigth (C) 2013, 2014
Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
This file is part of ODR-DabMod.
ODR-DabMod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMod. If not, see .
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#if defined(HAVE_INPUT_ZEROMQ)
#include
#include
#include
#include
#include "zmq.hpp"
#include
#include "porting.h"
#include "InputReader.h"
#include "PcDebug.h"
#define MAX_QUEUE_SIZE 50
#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
/* A concatenation of four ETI frames,
* whose maximal size is 6144.
*
* Four frames in one zmq message are sent, so that
* we do not risk breaking ETI vs. transmission frame
* phase.
*
* The frames are concatenated in buf, and
* their sizes is given in the buflen array.
*
* Most of the time, the buf will not be completely
* filled
*/
struct zmq_dab_message_t
{
uint32_t version;
uint16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE];
uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];
};
int InputZeroMQReader::Open(std::string uri)
{
uri_ = uri;
workerdata_.uri = uri;
// launch receiver thread
worker_.Start(&workerdata_);
return 0;
}
int InputZeroMQReader::GetNextFrame(void* buffer)
{
const size_t framesize = 6144;
uint8_t* incoming;
in_messages_.wait_and_pop(incoming);
memcpy(buffer, incoming, framesize);
delete incoming;
return framesize;
}
void InputZeroMQReader::PrintInfo()
{
fprintf(stderr, "Input ZeroMQ:\n");
fprintf(stderr, " Receiving from %s\n\n", uri_.c_str());
}
// ------------- Worker functions
void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
{
size_t queue_size = 0;
bool buffer_full = false;
zmq::socket_t subscriber(zmqcontext, ZMQ_SUB);
// zmq sockets are not thread safe. That's why
// we create it here, and not at object creation.
try {
subscriber.connect(workerdata->uri.c_str());
subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
while (running)
{
zmq::message_t incoming;
subscriber.recv(&incoming);
if (m_to_drop) {
queue_size = workerdata->in_messages->size();
if (queue_size > 4) {
workerdata->in_messages->notify();
}
m_to_drop--;
}
else if (queue_size < MAX_QUEUE_SIZE) {
if (buffer_full) {
fprintf(stderr, "ZeroMQ buffer recovered: %zu elements\n",
queue_size);
buffer_full = false;
}
zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
if (dab_msg->version != 1) {
fprintf(stderr, "ZeroMQ input: wrong packet version %d\n",
dab_msg->version);
}
int offset = sizeof(dab_msg->version) +
NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++)
{
if (dab_msg->buflen[i] <= 0 ||
dab_msg->buflen[i] > 6144)
{
fprintf(stderr, "ZeroMQ buffer %d: invalid length %d\n",
i, dab_msg->buflen[i]);
// TODO error handling
}
else {
uint8_t* buf = new uint8_t[6144];
const int framesize = dab_msg->buflen[i];
memcpy(buf,
((uint8_t*)incoming.data()) + offset,
framesize);
// pad to 6144 bytes
memset(&((uint8_t*)buf)[framesize],
0x55, 6144 - framesize);
offset += framesize;
queue_size = workerdata->in_messages->push(buf);
}
}
}
else {
workerdata->in_messages->notify();
if (!buffer_full) {
fprintf(stderr, "ZeroMQ buffer overfull !\n");
buffer_full = true;
}
queue_size = workerdata->in_messages->size();
/* Drop three more incoming ETI frames before
* we start accepting them again, to guarantee
* that we keep transmission frame vs. ETI frame
* phase.
*/
m_to_drop = 3;
}
if (queue_size < 5) {
fprintf(stderr, "ZeroMQ buffer underfull: %zu elements !\n",
queue_size);
}
}
}
catch (zmq::error_t& err) {
fprintf(stderr, "ZeroMQ error in RecvProcess: '%s'\n", err.what());
}
fprintf(stderr, "ZeroMQ input worker terminated\n");
subscriber.close();
}
void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
{
running = true;
recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);
}
void InputZeroMQWorker::Stop()
{
running = false;
zmqcontext.close();
recv_thread.join();
}
#endif