aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-10-24 22:05:27 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-10-24 22:05:27 +0200
commitd18935e22e3c687c152bd35ad05c66bf5a19e00a (patch)
tree805e41b15e7092939fa1a06ba5b4c45cfff93e32
parent31e8a4fd8e4c057fffc84c071cc0b71a7989795c (diff)
downloaddabmux-d18935e22e3c687c152bd35ad05c66bf5a19e00a.tar.gz
dabmux-d18935e22e3c687c152bd35ad05c66bf5a19e00a.tar.bz2
dabmux-d18935e22e3c687c152bd35ad05c66bf5a19e00a.zip
Refactor StatsServer to be a bit more generic
-rw-r--r--src/StatsServer.cpp77
-rw-r--r--src/StatsServer.h34
-rw-r--r--src/dabInputZmq.cpp14
-rw-r--r--src/dabInputZmq.h3
4 files changed, 71 insertions, 57 deletions
diff --git a/src/StatsServer.cpp b/src/StatsServer.cpp
index 25e1b76..558a479 100644
--- a/src/StatsServer.cpp
+++ b/src/StatsServer.cpp
@@ -38,10 +38,12 @@
#include "StatsServer.h"
#include "Log.h"
-void StatsServer::registerInput(std::string id)
+void StatsServer::registerInput(InputStat* is)
{
boost::mutex::scoped_lock lock(m_mutex);
+ std::string id(is->get_name());
+
if (m_inputStats.count(id) == 1) {
etiLog.level(error) <<
"Double registration in Stats Server with id '" <<
@@ -49,44 +51,23 @@ void StatsServer::registerInput(std::string id)
return;
}
- InputStat is;
m_inputStats[id] = is;
}
-void StatsServer::notifyBuffer(std::string id, long bufsize)
+void StatsServer::unregisterInput(std::string id)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (isInputRegistered(id))
- m_inputStats[id].notifyBuffer(bufsize);
+ if (m_inputStats.count(id) == 1) {
+ m_inputStats.erase(id);
+ }
}
-void StatsServer::notifyPeakLevels(std::string id, int peak_left, int peak_right)
-{
- boost::mutex::scoped_lock lock(m_mutex);
- if (isInputRegistered(id))
- m_inputStats[id].notifyPeakLevels(peak_left, peak_right);
-}
-
-void StatsServer::notifyUnderrun(std::string id)
-{
- boost::mutex::scoped_lock lock(m_mutex);
-
- if (isInputRegistered(id))
- m_inputStats[id].notifyUnderrun();
-}
-
-void StatsServer::notifyOverrun(std::string id)
+bool StatsServer::isInputRegistered(std::string& id)
{
boost::mutex::scoped_lock lock(m_mutex);
- if (isInputRegistered(id))
- m_inputStats[id].notifyOverrun();
-}
-
-bool StatsServer::isInputRegistered(std::string& id)
-{
if (m_inputStats.count(id) == 0) {
etiLog.level(error) <<
"Stats Server id '" <<
@@ -98,10 +79,12 @@ bool StatsServer::isInputRegistered(std::string& id)
std::string StatsServer::getConfigJSON()
{
+ boost::mutex::scoped_lock lock(m_mutex);
+
std::ostringstream ss;
ss << "{ \"config\" : [\n";
- std::map<std::string,InputStat>::iterator iter;
+ std::map<std::string,InputStat*>::iterator iter;
int i = 0;
for(iter = m_inputStats.begin(); iter != m_inputStats.end();
++iter, i++)
@@ -122,24 +105,26 @@ std::string StatsServer::getConfigJSON()
std::string StatsServer::getValuesJSON()
{
+ boost::mutex::scoped_lock lock(m_mutex);
+
std::ostringstream ss;
ss << "{ \"values\" : {\n";
- std::map<std::string,InputStat>::iterator iter;
+ std::map<std::string,InputStat*>::iterator iter;
int i = 0;
for(iter = m_inputStats.begin(); iter != m_inputStats.end();
++iter, i++)
{
const std::string& id = iter->first;
- InputStat& stats = iter->second;
+ InputStat* stats = iter->second;
if (i > 0) {
ss << " ,\n";
}
ss << " \"" << id << "\" : ";
- ss << stats.encodeValuesJSON();
- stats.reset();
+ ss << stats->encodeValuesJSON();
+ stats->reset();
}
ss << "}\n}\n";
@@ -149,24 +134,26 @@ std::string StatsServer::getValuesJSON()
std::string StatsServer::getStateJSON()
{
+ boost::mutex::scoped_lock lock(m_mutex);
+
std::ostringstream ss;
ss << "{\n";
- std::map<std::string,InputStat>::iterator iter;
+ std::map<std::string,InputStat*>::iterator iter;
int i = 0;
for(iter = m_inputStats.begin(); iter != m_inputStats.end();
++iter, i++)
{
const std::string& id = iter->first;
- InputStat& stats = iter->second;
+ InputStat* stats = iter->second;
if (i > 0) {
ss << " ,\n";
}
ss << " \"" << id << "\" : ";
- ss << stats.encodeStateJSON();
- stats.reset();
+ ss << stats->encodeStateJSON();
+ stats->reset();
}
ss << "}\n";
@@ -278,17 +265,14 @@ void StatsServer::serverThread()
}
if (strcmp(buffer, "config\n") == 0) {
- boost::mutex::scoped_lock lock(m_mutex);
std::string json = getConfigJSON();
n = write(accepted_sock, json.c_str(), json.size());
}
else if (strcmp(buffer, "values\n") == 0) {
- boost::mutex::scoped_lock lock(m_mutex);
std::string json = getValuesJSON();
n = write(accepted_sock, json.c_str(), json.size());
}
else if (strcmp(buffer, "state\n") == 0) {
- boost::mutex::scoped_lock lock(m_mutex);
std::string json = getStateJSON();
n = write(accepted_sock, json.c_str(), json.size());
}
@@ -315,6 +299,17 @@ end_serverthread:
}
}
+/************************************************/
+
+void InputStat::registerAtServer()
+{
+ global_stats->registerInput(this);
+}
+
+InputStat::~InputStat()
+{
+ global_stats->unregisterInput(m_name);
+}
std::string InputStat::encodeValuesJSON()
{
@@ -322,6 +317,8 @@ std::string InputStat::encodeValuesJSON()
const int16_t int16_max = std::numeric_limits<int16_t>::max();
+ boost::mutex::scoped_lock lock(m_mutex);
+
/* convert to dB */
int dB_l = peak_left ? round(20*log10((double)peak_left / int16_max)) : -90;
int dB_r = peak_right ? round(20*log10((double)peak_right / int16_max)) : -90;
@@ -369,6 +366,8 @@ std::string InputStat::encodeStateJSON()
input_state_t InputStat::determineState(void)
{
+ boost::mutex::scoped_lock lock(m_mutex);
+
time_t now = time(NULL);
input_state_t state;
diff --git a/src/StatsServer.h b/src/StatsServer.h
index 731d8ca..26f90d4 100644
--- a/src/StatsServer.h
+++ b/src/StatsServer.h
@@ -146,7 +146,8 @@ enum input_state_t
class InputStat
{
public:
- InputStat() {
+ InputStat(std::string name) : m_name(name)
+ {
/* Statistics */
num_underruns = 0;
num_overruns = 0;
@@ -162,6 +163,9 @@ class InputStat
reset();
}
+ void registerAtServer(void);
+
+ ~InputStat();
// Gets called each time the statistics are transmitted,
// and resets the counters to zero
@@ -174,11 +178,15 @@ class InputStat
peak_right = 0;
}
+ std::string& get_name(void) { return m_name; }
+
/* This function is called for every frame read by
* the multiplexer
*/
void notifyBuffer(long bufsize)
{
+ boost::mutex::scoped_lock lock(m_mutex);
+
// Statistics
if (bufsize > max_fill_buffer) {
max_fill_buffer = bufsize;
@@ -198,6 +206,8 @@ class InputStat
void notifyPeakLevels(int peak_left, int peak_right)
{
+ boost::mutex::scoped_lock lock(m_mutex);
+
// Statistics
if (peak_left > this->peak_left) {
this->peak_left = peak_left;
@@ -233,6 +243,8 @@ class InputStat
void notifyUnderrun(void)
{
+ boost::mutex::scoped_lock lock(m_mutex);
+
// Statistics
num_underruns++;
@@ -245,6 +257,8 @@ class InputStat
void notifyOverrun(void)
{
+ boost::mutex::scoped_lock lock(m_mutex);
+
// Statistics
num_overruns++;
@@ -262,6 +276,8 @@ class InputStat
input_state_t determineState(void);
private:
+ std::string m_name;
+
/************ STATISTICS ***********/
// minimum and maximum buffer fill since last reset
long min_fill_buffer;
@@ -283,6 +299,9 @@ class InputStat
time_t m_time_last_buffer_nonempty;
bool m_buffer_empty;
+ // The mutex that has to be held during all notify and readout
+ mutable boost::mutex m_mutex;
+
};
class StatsServer
@@ -313,19 +332,12 @@ class StatsServer
m_thread.join();
}
- void registerInput(std::string id);
+ void registerInput(InputStat* is);
+ void unregisterInput(std::string id);
bool fault_detected() { return m_fault; }
void restart(void);
- /* The following notify functions are used by the input to
- * inform the StatsServer about new values
- */
- void notifyBuffer(std::string id, long bufsize);
- void notifyPeakLevels(std::string id, int peak_left, int peak_right);
- void notifyUnderrun(std::string id);
- void notifyOverrun(std::string id);
-
private:
void restart_thread(long);
@@ -348,7 +360,7 @@ class StatsServer
int m_sock;
/******* Statistics Data ********/
- std::map<std::string, InputStat> m_inputStats;
+ std::map<std::string, InputStat*> m_inputStats;
/* Return a description of the configuration that will
* allow to define what graphs to be created
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index 7e79c73..9dab6c9 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -233,7 +233,7 @@ int DabInputZmqBase::open(const std::string inputUri)
rebind();
// We want to appear in the statistics !
- global_stats->registerInput(m_name);
+ m_stats.registerAtServer();
return 0;
}
@@ -264,7 +264,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size)
/* Notify of a buffer overrun, and drop some frames */
if (m_frame_buffer.size() >= m_config.buffer_size) {
- global_stats->notifyOverrun(m_name);
+ m_stats.notifyOverrun();
/* If the buffer is really too full, we drop as many frames as needed
* to get down to the prebuffering size. We would like to have our buffer
@@ -308,13 +308,13 @@ int DabInputZmqBase::readFrame(void* buffer, int size)
m_name.c_str());
/* During prebuffering, give a zeroed frame to the mux */
- global_stats->notifyUnderrun(m_name);
+ m_stats.notifyUnderrun();
memset(buffer, 0, size);
return size;
}
// Save stats data in bytes, not in frames
- global_stats->notifyBuffer(m_name, m_frame_buffer.size() * size);
+ m_stats.notifyBuffer(m_frame_buffer.size() * size);
if (m_frame_buffer.empty()) {
etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
@@ -323,7 +323,7 @@ int DabInputZmqBase::readFrame(void* buffer, int size)
m_prebuf_current = m_config.prebuffering;
/* We have no data to give, we give a zeroed frame */
- global_stats->notifyUnderrun(m_name);
+ m_stats.notifyUnderrun();
memset(buffer, 0, size);
return size;
}
@@ -373,7 +373,7 @@ int DabInputZmqMPEG::readFromSocket(size_t framesize)
datalen = frame->datasize;
data = ZMQ_FRAME_DATA(frame);
- global_stats->notifyPeakLevels(m_name, frame->audiolevel_left,
+ m_stats.notifyPeakLevels(frame->audiolevel_left,
frame->audiolevel_right);
}
@@ -444,7 +444,7 @@ int DabInputZmqAAC::readFromSocket(size_t framesize)
datalen = frame->datasize;
data = ZMQ_FRAME_DATA(frame);
- global_stats->notifyPeakLevels(m_name, frame->audiolevel_left,
+ m_stats.notifyPeakLevels(frame->audiolevel_left,
frame->audiolevel_right);
}
diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h
index 1597298..2ba846b 100644
--- a/src/dabInputZmq.h
+++ b/src/dabInputZmq.h
@@ -167,6 +167,7 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {
m_bitrate(0),
m_enable_input(true),
m_config(config),
+ m_stats(m_name),
m_prebuf_current(0) {
RC_ADD_PARAMETER(enable,
"If the input is enabled. Set to zero to empty the buffer.");
@@ -231,6 +232,8 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {
std::string m_inputUri;
+ InputStat m_stats;
+
private:
int m_prebuf_current;
};