aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp29
-rw-r--r--src/DabMultiplexer.cpp13
-rw-r--r--src/DabMultiplexer.h9
-rw-r--r--src/DabMux.cpp8
-rw-r--r--src/ManagementServer.cpp169
-rw-r--r--src/ManagementServer.h28
-rw-r--r--src/MuxElements.cpp16
-rw-r--r--src/MuxElements.h36
-rw-r--r--src/utils.cpp23
-rw-r--r--src/utils.h3
10 files changed, 175 insertions, 159 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 254a385..b210ac0 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -221,18 +221,35 @@ void parse_ptree(boost::property_tree::ptree& pt,
/******************** READ SERVICES PARAMETERS *************/
- map<string, DabService*> allservices;
+ map<string, shared_ptr<DabService> > allservices;
/* For each service, we keep a separate SCIdS counter */
- map<DabService*, int> SCIdS_per_service;
+ map<shared_ptr<DabService>, int> SCIdS_per_service;
ptree pt_services = pt.get_child("services");
for (ptree::iterator it = pt_services.begin();
it != pt_services.end(); ++it) {
string serviceuid = it->first;
ptree pt_service = it->second;
- DabService* service = new DabService(serviceuid);
- ensemble->services.push_back(service);
+
+ shared_ptr<DabService> service;
+
+ bool service_already_existing = false;
+
+ for (auto srv : ensemble->services)
+ {
+ if (srv->uid == serviceuid) {
+ service = srv;
+ service_already_existing = true;
+ break;
+ }
+ }
+
+ if (not service_already_existing) {
+ auto new_srv = make_shared<DabService>(serviceuid);
+ ensemble->services.push_back(new_srv);
+ service = new_srv;
+ }
int success = -5;
@@ -298,7 +315,7 @@ void parse_ptree(boost::property_tree::ptree& pt,
ptree pt_subchans = pt.get_child("subchannels");
for (ptree::iterator it = pt_subchans.begin(); it != pt_subchans.end(); ++it) {
string subchanuid = it->first;
- dabSubchannel* subchan = new dabSubchannel();
+ dabSubchannel* subchan = new dabSubchannel(subchanuid);
ensemble->subchannels.push_back(subchan);
@@ -331,7 +348,7 @@ void parse_ptree(boost::property_tree::ptree& pt,
string componentuid = it->first;
ptree pt_comp = it->second;
- DabService* service;
+ shared_ptr<DabService> service;
try {
// Those two uids serve as foreign keys to select the service+subchannel
string service_uid = pt_comp.get<string>("service");
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index 55de493..413e034 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -116,6 +116,17 @@ void DabMultiplexer::prepare_watermark()
m_watermarkPos = 0;
}
+void DabMultiplexer::update_config(boost::property_tree::ptree pt)
+{
+ ensemble->services.clear();
+ ensemble->components.clear();
+ ensemble->subchannels.clear();
+
+ m_pt = pt;
+
+ prepare();
+}
+
// Run a set of checks on the configuration
void DabMultiplexer::prepare()
{
@@ -369,7 +380,7 @@ void DabMultiplexer::mux_frame(std::vector<boost::shared_ptr<DabOutput> >& outpu
unsigned char etiFrame[6144];
unsigned short index = 0;
- vector<DabService*>::iterator service;
+ vector<std::shared_ptr<DabService> >::iterator service;
vector<DabComponent*>::iterator component;
vector<dabSubchannel*>::iterator subchannel;
diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h
index ebedc82..e9843fc 100644
--- a/src/DabMultiplexer.h
+++ b/src/DabMultiplexer.h
@@ -46,6 +46,7 @@
#include "Eti.h"
#include <exception>
#include <vector>
+#include <memory>
#include <string>
#include <boost/shared_ptr.hpp>
#include <boost/property_tree/ptree.hpp>
@@ -74,6 +75,8 @@ class DabMultiplexer {
void print_info(void);
+ void update_config(boost::property_tree::ptree pt);
+
private:
void prepare_watermark(void);
void prepare_subchannels(void);
@@ -101,9 +104,9 @@ class DabMultiplexer {
unsigned int insertFIG;
unsigned int rotateFIB;
- std::vector<DabService*>::iterator serviceProgFIG0_2;
- std::vector<DabService*>::iterator serviceDataFIG0_2;
- std::vector<DabService*>::iterator serviceFIG0_17;
+ std::vector<std::shared_ptr<DabService> >::iterator serviceProgFIG0_2;
+ std::vector<std::shared_ptr<DabService> >::iterator serviceDataFIG0_2;
+ std::vector<std::shared_ptr<DabService> >::iterator serviceFIG0_17;
std::vector<DabComponent*>::iterator componentProgFIG0_8;
std::vector<DabComponent*>::iterator componentDataFIG0_8;
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index e7da965..560cf65 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -481,9 +481,11 @@ int main(int argc, char *argv[])
else if (mgmt_server.request_pending()) {
mgmt_server.update_ptree(pt);
}
- /* else if (mgmt_server.retrieve_new_ptree(pt)) {
- }
- */
+ else if (mgmt_server.retrieve_new_ptree(pt)) {
+ etiLog.level(warn) <<
+ "Detected configuration change";
+ mux.update_config(pt);
+ }
}
}
etiLog.level(info) << "Goodbye";
diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp
index fc0a43e..9687278 100644
--- a/src/ManagementServer.cpp
+++ b/src/ManagementServer.cpp
@@ -197,140 +197,107 @@ void ManagementServer::restart_thread(long)
void ManagementServer::serverThread()
{
- using boost::asio::ip::tcp;
-
m_running = true;
m_fault = false;
- while (m_running) {
- m_io_service.reset();
-
- tcp::acceptor acceptor(m_io_service, tcp::endpoint(
- boost::asio::ip::address::from_string("127.0.0.1"),
- m_listenport) );
+ std::stringstream bind_addr;
+ bind_addr << "tcp://127.0.0.1:" << m_listenport;
+ m_zmq_sock.bind(bind_addr.str().c_str());
+ while (m_running) {
+ zmq::message_t zmq_message;
+ m_zmq_sock.recv(&zmq_message);
- // Add a job to start accepting connections.
- boost::shared_ptr<tcp::socket> socket(
- new tcp::socket(acceptor.get_io_service()));
-
- // Add an accept call to the service. This will prevent io_service::run()
- // from returning.
- etiLog.level(warn) << "MGMT: Waiting on connection";
- acceptor.async_accept(*socket,
- boost::bind(&ManagementServer::handle_accept,
- this,
- boost::asio::placeholders::error,
- socket,
- boost::ref(acceptor)));
- // Process event loop.
- m_io_service.run();
+ handle_message(zmq_message);
}
m_fault = true;
}
-void ManagementServer::handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor)
+bool ManagementServer::handle_setptree(
+ zmq::message_t& zmq_message, std::stringstream& answer)
{
- if (boost_error)
- {
- etiLog.level(error) << "MGMT: Error accepting connection";
- return;
- }
-
- std::stringstream welcome_msg;
-
- welcome_msg <<
- "{ \"service\": \"" <<
- PACKAGE_NAME << " " <<
-#if defined(GITVERSION)
- GITVERSION <<
-#else
- PACKAGE_VERSION <<
-#endif
- " MGMT Server\" }\n";
-
try {
- etiLog.level(info) << "RC: Accepted";
+ if (zmq_message.more()) {
+ zmq::message_t zmq_new_ptree;
+ m_zmq_sock.recv(&zmq_new_ptree);
+ std::string new_ptree(
+ (char*)zmq_new_ptree.data(), zmq_new_ptree.size()
+ );
- boost::system::error_code ignored_error;
+ etiLog.level(info) << "Received ptree " << new_ptree;
- boost::asio::write(*socket, boost::asio::buffer(welcome_msg.str()),
- boost::asio::transfer_all(),
- ignored_error);
+ boost::unique_lock<boost::mutex> lock(m_configmutex);
+ m_pt.clear();
- boost::asio::streambuf buffer;
- size_t length =
- boost::asio::read_until(*socket, buffer, "\n", ignored_error);
+ std::stringstream json_stream;
+ json_stream << new_ptree;
+ boost::property_tree::json_parser::read_json(json_stream, m_pt);
- std::string in_message;
- std::istream str(&buffer);
- std::getline(str, in_message);
+ m_retrieve_pending = true;
+ answer << "OK";
- if (in_message == "config") {
- std::string json = getStatConfigJSON();
- boost::asio::write(*socket, boost::asio::buffer(json),
- boost::asio::transfer_all(),
- ignored_error);
+ return true;
}
- else if (in_message == "values") {
- std::string json = getValuesJSON();
- boost::asio::write(*socket, boost::asio::buffer(json),
- boost::asio::transfer_all(),
- ignored_error);
+ else {
+ etiLog.level(error) <<
+ "MGMT: setptree command is missing data.";
}
- else if (in_message == "state") {
- std::string json = getStateJSON();
+ }
+ catch (std::exception& e) {
+ etiLog.level(error) <<
+ "MGMT: setptree error." << e.what();
+ }
+ return false;
+}
- boost::asio::write(*socket, boost::asio::buffer(json),
- boost::asio::transfer_all(),
- ignored_error);
- }
- else if (in_message == "setptree") {
- boost::asio::streambuf jsonbuffer;
- length = boost::asio::read_until(
- *socket, jsonbuffer, "\n", ignored_error);
+void ManagementServer::handle_message(zmq::message_t& zmq_message)
+{
+ std::stringstream answer;
+ std::string data((char*)zmq_message.data(), zmq_message.size());
- if (length > 0) {
- boost::unique_lock<boost::mutex> lock(m_configmutex);
- m_pt.clear();
+ try {
+ etiLog.level(info) << "RC: Accepted";
- std::istream json_stream(&jsonbuffer);
- boost::property_tree::json_parser::read_json(json_stream, m_pt);
- }
- else if (length == 0) {
- etiLog.level(warn) <<
- "MGMT: No JSON data received";
- }
- else {
- etiLog.level(error) <<
- "MGMT: Error JSON reception";
- }
+ if (data == "info") {
+ answer <<
+ "{ \"service\": \"" <<
+ PACKAGE_NAME << " " <<
+#if defined(GITVERSION)
+ GITVERSION <<
+#else
+ PACKAGE_VERSION <<
+#endif
+ " MGMT Server\" }\n";
+ }
+ else if (data == "config") {
+ answer << getStatConfigJSON();
+ }
+ else if (data == "values") {
+ answer << getValuesJSON();
}
- else if (in_message == "getptree") {
+ else if (data == "state") {
+ answer << getStateJSON();
+ }
+ else if (data == "setptree") {
+ handle_setptree(zmq_message, answer);
+ }
+ else if (data == "getptree") {
boost::unique_lock<boost::mutex> lock(m_configmutex);
m_pending = true;
while (m_pending && !m_retrieve_pending) {
m_condition.wait(lock);
}
- std::stringstream ss;
- boost::property_tree::json_parser::write_json(ss, m_pt);
-
- boost::asio::write(*socket, boost::asio::buffer(ss.str()),
- boost::asio::transfer_all(),
- ignored_error);
+ boost::property_tree::json_parser::write_json(answer, m_pt);
}
else {
- std::string invcmd("Invalid command\n");
- boost::asio::write(*socket, boost::asio::buffer(invcmd),
- boost::asio::transfer_all(),
- ignored_error);
+ answer << "Invalid command";
}
+
+ std::string answerstr(answer.str());
+ m_zmq_sock.send(answerstr.c_str(), answerstr.size());
}
catch (std::exception& e) {
etiLog.level(error) <<
diff --git a/src/ManagementServer.h b/src/ManagementServer.h
index cea692f..836dee4 100644
--- a/src/ManagementServer.h
+++ b/src/ManagementServer.h
@@ -7,7 +7,7 @@
http://www.opendigitalradio.org
- A TCP Socket server that serves state information and statistics for
+ A server that serves state information and statistics for
monitoring purposes, and also serves the internal configuration
property tree.
@@ -15,7 +15,7 @@
http://munin-monitoring.org/
but is not specific to it.
- The TCP Server responds in JSON, and accepts the commands:
+ The responds in JSON, and accepts the commands:
- config
- values
Inspired by the munin equivalent
@@ -27,6 +27,7 @@
Returns the internal boost property_tree that contains the
multiplexer configuration DB.
+ The server is using REQ/REP ZeroMQ sockets.
*/
/*
This file is part of ODR-DabMux.
@@ -52,11 +53,11 @@
# include "config.h"
#endif
+#include "zmq.hpp"
#include <string>
#include <map>
#include <atomic>
#include <boost/thread.hpp>
-#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
@@ -314,7 +315,8 @@ class ManagementServer
{
public:
ManagementServer() :
- m_io_service(),
+ m_zmq_context(),
+ m_zmq_sock(m_zmq_context, ZMQ_REP),
m_running(false),
m_fault(false),
m_pending(false) { }
@@ -325,7 +327,7 @@ class ManagementServer
m_fault = false;
m_pending = false;
- m_io_service.stop();
+ // TODO notify
m_thread.join();
}
@@ -359,23 +361,21 @@ class ManagementServer
void restart(void);
private:
- boost::asio::io_service m_io_service;
-
void restart_thread(long);
- /******* TCP Socket Server ******/
+ /******* Server ******/
+ zmq::context_t m_zmq_context;
+ zmq::socket_t m_zmq_sock;
+
// no copying (because of the thread)
ManagementServer(const ManagementServer& other);
void serverThread(void);
+ void handle_message(zmq::message_t& zmq_message);
+ bool handle_setptree(zmq::message_t& zmq_message, std::stringstream& answer);
bool isInputRegistered(std::string& id);
- void handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor);
-
int m_listenport;
// serverThread runs in a separate thread
@@ -418,7 +418,7 @@ class ManagementServer
boost::property_tree::ptree m_pt;
};
-// If necessary construct the management server singleton and return
+// If necessary construct the management server singleton and return
// a reference to it
ManagementServer& get_mgmt_server();
diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp
index 0a52f0b..4bd82cd 100644
--- a/src/MuxElements.cpp
+++ b/src/MuxElements.cpp
@@ -191,19 +191,19 @@ vector<DabComponent*>::iterator getComponent(
return getComponent(components, serviceId, components.end());
}
-vector<DabService*>::iterator getService(
+std::vector<std::shared_ptr<DabService> >::iterator getService(
DabComponent* component,
- vector<DabService*>& services)
+ std::vector<std::shared_ptr<DabService> >& services)
{
- vector<DabService*>::iterator service;
-
- for (service = services.begin(); service != services.end(); ++service) {
- if ((*service)->id == component->serviceId) {
- break;
+ size_t i = 0;
+ for (auto service : services) {
+ if (service->id == component->serviceId) {
+ return services.begin() + i;
}
+ i++;
}
- return service;
+ throw std::runtime_error("Service not included in any component");
}
bool DabComponent::isPacketComponent(vector<dabSubchannel*>& subchannels)
diff --git a/src/MuxElements.h b/src/MuxElements.h
index f4def85..ada7ce3 100644
--- a/src/MuxElements.h
+++ b/src/MuxElements.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2014, 2015
Matthias P. Braendli, matthias.braendli@mpb.li
This file defines all data structures used in DabMux to represent
@@ -29,6 +29,7 @@
#define _MUX_ELEMENTS
#include <vector>
+#include <memory>
#include <string>
#include <functional>
#include <algorithm>
@@ -130,7 +131,7 @@ class dabEnsemble : public RemoteControllable {
int international_table;
- std::vector<DabService*> services;
+ std::vector<std::shared_ptr<DabService> > services;
std::vector<DabComponent*> components;
std::vector<dabSubchannel*> subchannels;
};
@@ -178,7 +179,16 @@ enum dab_subchannel_type_t {
Packet = 3
};
-struct dabSubchannel {
+class dabSubchannel
+{
+public:
+ dabSubchannel(std::string& uid) :
+ uid(uid)
+ {
+ }
+
+ std::string uid;
+
std::string inputUri;
DabInputBase* input;
unsigned char id;
@@ -232,12 +242,15 @@ struct dabPacketComponent {
class DabComponent : public RemoteControllable
{
public:
- DabComponent(std::string uid) :
- RemoteControllable(uid)
+ DabComponent(std::string& uid) :
+ RemoteControllable(uid),
+ uid(uid)
{
RC_ADD_PARAMETER(label, "Label and shortlabel [label,short]");
}
+ std::string uid;
+
DabLabel label;
uint32_t serviceId;
uint8_t subchId;
@@ -270,12 +283,15 @@ class DabComponent : public RemoteControllable
class DabService : public RemoteControllable
{
public:
- DabService(std::string uid) :
- RemoteControllable(uid)
+ DabService(std::string& uid) :
+ RemoteControllable(uid),
+ uid(uid)
{
RC_ADD_PARAMETER(label, "Label and shortlabel [label,short]");
}
+ std::string uid;
+
uint32_t id;
unsigned char pty;
unsigned char language;
@@ -312,9 +328,9 @@ std::vector<DabComponent*>::iterator getComponent(
std::vector<DabComponent*>& components,
uint32_t serviceId);
-std::vector<DabService*>::iterator getService(
+std::vector<std::shared_ptr<DabService> >::iterator getService(
DabComponent* component,
- std::vector<DabService*>& services);
+ std::vector<std::shared_ptr<DabService> >& services);
unsigned short getSizeCu(dabSubchannel* subchannel);
@@ -324,5 +340,5 @@ unsigned short getSizeByte(dabSubchannel* subchannel);
unsigned short getSizeWord(dabSubchannel* subchannel);
-
#endif
+
diff --git a/src/utils.cpp b/src/utils.cpp
index 50591ae..42937c1 100644
--- a/src/utils.cpp
+++ b/src/utils.cpp
@@ -366,28 +366,27 @@ void printOutputs(vector<boost::shared_ptr<DabOutput> >& outputs)
}
}
-void printServices(vector<DabService*>& services)
+void printServices(const vector<shared_ptr<DabService> >& services)
{
- vector<DabService*>::const_iterator current;
int index = 0;
- for (current = services.begin(); current != services.end(); ++current) {
+ for (auto service : services) {
- etiLog.level(info) << "Service " << (*current)->get_rc_name();
+ etiLog.level(info) << "Service " << service->get_rc_name();
etiLog.level(info) << " label: " <<
- (*current)->label.long_label();
+ service->label.long_label();
etiLog.level(info) << " short label: " <<
- (*current)->label.short_label();
+ service->label.short_label();
- etiLog.log(info, " (0x%x)", (*current)->label.flag());
- etiLog.log(info, " id: 0x%lx (%lu)", (*current)->id,
- (*current)->id);
+ etiLog.log(info, " (0x%x)", service->label.flag());
+ etiLog.log(info, " id: 0x%lx (%lu)", service->id,
+ service->id);
- etiLog.log(info, " pty: 0x%x (%u)", (*current)->pty,
- (*current)->pty);
+ etiLog.log(info, " pty: 0x%x (%u)", service->pty,
+ service->pty);
etiLog.log(info, " language: 0x%x (%u)",
- (*current)->language, (*current)->language);
+ service->language, service->language);
++index;
}
}
diff --git a/src/utils.h b/src/utils.h
index 8197210..1756adf 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -52,7 +52,7 @@ void printUsageConfigfile(char *name, FILE* out = stderr);
* resp. subchannels*/
void printOutputs(std::vector<boost::shared_ptr<DabOutput> >& outputs);
-void printServices(std::vector<DabService*>& services);
+void printServices(std::vector<std::shared_ptr<DabService> >& services);
void printComponents(std::vector<DabComponent*>& components);
@@ -64,3 +64,4 @@ void printEnsemble(const boost::shared_ptr<dabEnsemble> ensemble);
/* Print detailed component information */
void printComponent(DabComponent* component);
#endif
+