aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-02-14 18:09:29 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-02-14 18:09:29 +0100
commitd6d8d67b1c921d8fca7257291c15c07bdea8d14a (patch)
treeb682e411547bbf83d95c0c54b8e5d7c865d6aaa1
parent355c427917f06debcda701ccb75c8134da225aff (diff)
downloaddabmux-d6d8d67b1c921d8fca7257291c15c07bdea8d14a.tar.gz
dabmux-d6d8d67b1c921d8fca7257291c15c07bdea8d14a.tar.bz2
dabmux-d6d8d67b1c921d8fca7257291c15c07bdea8d14a.zip
Add more ZMQ RC parameters
-rw-r--r--README3
-rw-r--r--src/RemoteControl.cpp3
-rw-r--r--src/RemoteControl.h3
-rw-r--r--src/dabInputZmq.cpp58
-rw-r--r--src/dabInputZmq.h31
5 files changed, 78 insertions, 20 deletions
diff --git a/README b/README
index a917667..acdd124 100644
--- a/README
+++ b/README
@@ -15,7 +15,8 @@ In addition to the features of CRC-DabMux, this fork contains:
- supports logging to syslog
- supports ZMQ input monitoring with munin tool
- supports a Telnet Remote Control for setting/getting parameters
- (can change subchannel and component labels)
+ (can change subchannel and component labels, and ZMQ input buffer
+ parameters)
The src/ directory contains the source code of ODR-DabMux.
diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp
index a075497..fb1aa7e 100644
--- a/src/RemoteControl.cpp
+++ b/src/RemoteControl.cpp
@@ -3,7 +3,8 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012
+ Copyright (C) 2014
+ Matthias P. Braendli, matthias.braendli@mpb.li
*/
/*
This file is part of ODR-DabMux.
diff --git a/src/RemoteControl.h b/src/RemoteControl.h
index f8c14fd..a39af09 100644
--- a/src/RemoteControl.h
+++ b/src/RemoteControl.h
@@ -3,7 +3,8 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Written by Matthias P. Braendli, matthias.braendli@mpb.li, 2012
+ Copyright (C) 2014
+ Matthias P. Braendli, matthias.braendli@mpb.li
This module adds remote-control capability to some of the dabmod modules.
see testremotecontrol/test.cpp for an example of how to use this.
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index c883f35..387c8cc 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2013 Matthias P. Braendli
+ Copyright (C) 2013, 2014 Matthias P. Braendli
http://mpb.li
ZeroMQ input. see www.zeromq.org for more info
@@ -40,6 +40,7 @@
#include "dabInputZmq.h"
#include "StatsServer.h"
#include "zmq.hpp"
+#include "PcDebug.h"
#ifdef HAVE_CONFIG_H
# include "config.h"
@@ -47,10 +48,11 @@
#ifdef HAVE_INPUT_ZEROMQ
-#include <stdio.h>
+#include <cstdio>
+#include <cstdlib>
#include <list>
#include <exception>
-#include <string.h>
+#include <cstring>
#include <string>
#include <sstream>
#include <limits.h>
@@ -115,14 +117,14 @@ int DabInputZmqBase::readFrame(void* buffer, int size)
rc = readFromSocket(size);
/* Notify of a buffer overrun, and drop some frames */
- if (m_frame_buffer.size() >= INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() >= m_frame_buffer_limit) {
global_stats->notifyOverrun(m_name);
/* If the buffer is really too full, we drop as many frames as needed
* to get down to the prebuffering size. We would like to have our buffer
* filled to the prebuffering length.
*/
- if (m_frame_buffer.size() >= 1.5*INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() >= 1.5*m_frame_buffer_limit) {
size_t over_max = m_frame_buffer.size() - INPUT_ZMQ_PREBUFFERING;
while (over_max--) {
@@ -217,19 +219,22 @@ int DabInputZmqMPEG::readFromSocket(int framesize)
if (msg.size() == framesize)
{
- if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() > m_frame_buffer_limit) {
etiLog.level(warn) <<
"inputZMQ " << m_name <<
" buffer full (" << m_frame_buffer.size() << "),"
" dropping incoming frame !";
messageReceived = 0;
}
- else {
+ else if (m_enable_input) {
// copy the input frame blockwise into the frame_buffer
char* frame = new char[framesize];
memcpy(frame, data, framesize);
m_frame_buffer.push_back(frame);
}
+ else {
+ return 0;
+ }
}
else {
etiLog.level(error) <<
@@ -273,14 +278,14 @@ int DabInputZmqAAC::readFromSocket(int framesize)
*/
if (msg.size() == 5*framesize)
{
- if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ if (m_frame_buffer.size() > m_frame_buffer_limit) {
etiLog.level(warn) <<
"inputZMQ " << m_name <<
" buffer full (" << m_frame_buffer.size() << "),"
" dropping incoming superframe !";
messageReceived = 0;
}
- else {
+ else if (m_enable_input) {
// copy the input frame blockwise into the frame_buffer
for (char* framestart = data;
framestart < &data[5*framesize];
@@ -290,6 +295,9 @@ int DabInputZmqAAC::readFromSocket(int framesize)
m_frame_buffer.push_back(frame);
}
}
+ else {
+ return 0;
+ }
}
else {
etiLog.level(error) <<
@@ -309,7 +317,29 @@ void DabInputZmqBase::set_parameter(string parameter, string value)
ss.exceptions ( stringstream::failbit | stringstream::badbit );
if (parameter == "buffer") {
- throw ParameterError("Parameter 'buffer' is read-only");
+ size_t new_limit = atol(value.c_str());
+
+ if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) {
+ throw ParameterError("Desired buffer size too small."
+ " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) );
+ }
+ else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ throw ParameterError("Desired buffer size too large."
+ " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );
+ }
+
+ m_frame_buffer_limit = new_limit;
+ }
+ else if (parameter == "enable") {
+ if (value == "1") {
+ m_enable_input = true;
+ }
+ else if (value == "0") {
+ m_enable_input = false;
+ }
+ else {
+ throw ParameterError("Value not understood, specify 0 or 1.");
+ }
}
else {
stringstream ss;
@@ -323,7 +353,13 @@ string DabInputZmqBase::get_parameter(string parameter)
{
stringstream ss;
if (parameter == "buffer") {
- ss << INPUT_ZMQ_MAX_BUFFER_SIZE;
+ ss << m_frame_buffer_limit;
+ }
+ else if (parameter == "enable") {
+ if (m_enable_input)
+ ss << "true";
+ else
+ ss << "false";
}
else {
ss << "Parameter '" << parameter <<
diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h
index cd8df6f..cb7fdd4 100644
--- a/src/dabInputZmq.h
+++ b/src/dabInputZmq.h
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2013 Matthias P. Braendli
+ Copyright (C) 2013, 2014 Matthias P. Braendli
http://mpb.li
ZeroMQ input. see www.zeromq.org for more info
@@ -61,17 +61,28 @@
// Number of elements to prebuffer before starting the pipeline
#define INPUT_ZMQ_PREBUFFERING (5*4) // 480ms
+// Default frame_buffer size in number of elements
+#define INPUT_ZMQ_DEF_BUFFER_SIZE (5*8) // 960ms
+
+// Minimum frame_buffer size in number of elements
+// This is one AAC superframe, but you probably don't want to
+// go that low anyway.
+#define INPUT_ZMQ_MIN_BUFFER_SIZE (5*1) // 120ms
+
// Maximum frame_buffer size in number of elements
-#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*8) // 960ms
+// One minute is clearly way over what everybody would
+// want.
+#define INPUT_ZMQ_MAX_BUFFER_SIZE (5*500) // 60s
class DabInputZmqBase : public DabInputBase, public RemoteControllable {
public:
DabInputZmqBase(const std::string name)
: RemoteControllable(name),
- m_name(name), m_zmq_context(1),
+ m_zmq_context(1),
m_zmq_sock(m_zmq_context, ZMQ_SUB),
- m_bitrate(0), m_prebuffering(INPUT_ZMQ_PREBUFFERING) {
- }
+ m_bitrate(0), m_prebuffering(INPUT_ZMQ_PREBUFFERING),
+ m_enable_input(true),
+ m_frame_buffer_limit(INPUT_ZMQ_DEF_BUFFER_SIZE) { }
virtual int open(const std::string inputUri);
virtual int readFrame(void* buffer, int size);
@@ -87,11 +98,15 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {
protected:
virtual int readFromSocket(int framesize) = 0;
- std::string m_name;
zmq::context_t m_zmq_context;
zmq::socket_t m_zmq_sock; // handle for the zmq socket
int m_bitrate;
int m_prebuffering;
+
+ /* set this to zero to empty the input buffer */
+ bool m_enable_input;
+
+ size_t m_frame_buffer_limit;
std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>]
};
@@ -101,6 +116,8 @@ class DabInputZmqMPEG : public DabInputZmqBase {
: DabInputZmqBase(name) {
RC_ADD_PARAMETER(buffer,
"Size of the input buffer [mpeg frames]");
+ RC_ADD_PARAMETER(enable,
+ "If the input is enabled. Set to zero to empty the buffer.");
}
private:
@@ -113,6 +130,8 @@ class DabInputZmqAAC : public DabInputZmqBase {
: DabInputZmqBase(name) {
RC_ADD_PARAMETER(buffer,
"Size of the input buffer [aac superframes]");
+ RC_ADD_PARAMETER(enable,
+ "If the input is enabled. Set to zero to empty the buffer.");
}
private: