summaryrefslogtreecommitdiffstats
path: root/src/dabInputZmq.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dabInputZmq.cpp')
-rw-r--r--src/dabInputZmq.cpp58
1 files changed, 47 insertions, 11 deletions
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index c883f35..387c8cc 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2013 Matthias P. Braendli
+ Copyright (C) 2013, 2014 Matthias P. Braendli
http://mpb.li
ZeroMQ input. see www.zeromq.org for more info
@@ -40,6 +40,7 @@
#include "dabInputZmq.h"
#include "StatsServer.h"
#include "zmq.hpp"
+#include "PcDebug.h"
#ifdef HAVE_CONFIG_H
# include "config.h"
@@ -47,10 +48,11 @@
#ifdef HAVE_INPUT_ZEROMQ
-#include <stdio.h>
+#include <cstdio>
+#include <cstdlib>
#include <list>
#include <exception>
-#include <string.h>
+#include <cstring>
#include <string>
#include <sstream>
#include <limits.h>
@@ -115,14 +117,14 @@ int DabInputZmqBase::readFrame(void* buffer, int size)
rc = readFromSocket(size);
/* Notify of a buffer overrun, and drop some frames */
- if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() >= m_frame_buffer_limit) {
global_stats->notifyOverrun(m_name);
/* If the buffer is really too full, we drop as many frames as needed
* to get down to the prebuffering size. We would like to have our buffer
* filled to the prebuffering length.
*/
- if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() >= 1.5*m_frame_buffer_limit) {
size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING;
while (over_max--) {
@@ -217,19 +219,22 @@ int DabInputZmqMPEG::readFromSocket(int framesize)
if (msg.size() == framesize)
{
- if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() > m_frame_buffer_limit) {
etiLog.level(warn) <<
"inputZMQ " << m_name <<
" buffer full (" << m_frame_buffer.size() << "),"
" dropping incoming frame !";
messageReceived = 0;
}
- else {
+ else if (m_enable_input) {
// copy the input frame blockwise into the frame_buffer
char* frame = new char[framesize];
memcpy(frame, data, framesize);
m_frame_buffer.push_back(frame);
}
+ else {
+ return 0;
+ }
}
else {
etiLog.level(error) <<
@@ -273,14 +278,14 @@ int DabInputZmqAAC::readFromSocket(int framesize)
*/
if (msg.size() == 5*framesize)
{
- if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() > m_frame_buffer_limit) {
etiLog.level(warn) <<
"inputZMQ " << m_name <<
" buffer full (" << m_frame_buffer.size() << "),"
" dropping incoming superframe !";
messageReceived = 0;
}
- else {
+ else if (m_enable_input) {
// copy the input frame blockwise into the frame_buffer
for (char* framestart = data;
framestart < &data[5*framesize];
@@ -290,6 +295,9 @@ int DabInputZmqAAC::readFromSocket(int framesize)
m_frame_buffer.push_back(frame);
}
}
+ else {
+ return 0;
+ }
}
else {
etiLog.level(error) <<
@@ -309,7 +317,29 @@ void DabInputZmqBase::set_parameter(string parameter, string value)
ss.exceptions ( stringstream::failbit | stringstream::badbit );
if (parameter == "buffer") {
- throw ParameterError("Parameter 'buffer' is read-only");
+ size_t new_limit = atol(value.c_str());
+
+ if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) {
+ throw ParameterError("Desired buffer size too small."
+ " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) );
+ }
+ else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ throw ParameterError("Desired buffer size too large."
+ " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );
+ }
+
+ m_frame_buffer_limit = new_limit;
+ }
+ else if (parameter == "enable") {
+ if (value == "1") {
+ m_enable_input = true;
+ }
+ else if (value == "0") {
+ m_enable_input = false;
+ }
+ else {
+ throw ParameterError("Value not understood, specify 0 or 1.");
+ }
}
else {
stringstream ss;
@@ -323,7 +353,13 @@ string DabInputZmqBase::get_parameter(string parameter)
{
stringstream ss;
if (parameter == "buffer") {
- ss << INPUT_ZMQ_MAX_BUFFER_SIZE;
+ ss << m_frame_buffer_limit;
+ }
+ else if (parameter == "enable") {
+ if (m_enable_input)
+ ss << "true";
+ else
+ ss << "false";
}
else {
ss << "Parameter '" << parameter <<