From 1328d62f9d3a2eb9f089d531614302005c29ec37 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 21 Jun 2015 21:56:59 +0200 Subject: Replace MGMT socket by ZMQ, make services shared_ptr --- src/ConfigParser.cpp | 29 ++++++-- src/DabMultiplexer.cpp | 13 +++- src/DabMultiplexer.h | 9 ++- src/DabMux.cpp | 8 ++- src/ManagementServer.cpp | 169 +++++++++++++++++++---------------------------- src/ManagementServer.h | 28 ++++---- src/MuxElements.cpp | 16 ++--- src/MuxElements.h | 36 +++++++--- src/utils.cpp | 23 +++---- src/utils.h | 3 +- 10 files changed, 175 insertions(+), 159 deletions(-) (limited to 'src') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 254a385..b210ac0 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -221,18 +221,35 @@ void parse_ptree(boost::property_tree::ptree& pt, /******************** READ SERVICES PARAMETERS *************/ - map allservices; + map > allservices; /* For each service, we keep a separate SCIdS counter */ - map SCIdS_per_service; + map, int> SCIdS_per_service; ptree pt_services = pt.get_child("services"); for (ptree::iterator it = pt_services.begin(); it != pt_services.end(); ++it) { string serviceuid = it->first; ptree pt_service = it->second; - DabService* service = new DabService(serviceuid); - ensemble->services.push_back(service); + + shared_ptr service; + + bool service_already_existing = false; + + for (auto srv : ensemble->services) + { + if (srv->uid == serviceuid) { + service = srv; + service_already_existing = true; + break; + } + } + + if (not service_already_existing) { + auto new_srv = make_shared(serviceuid); + ensemble->services.push_back(new_srv); + service = new_srv; + } int success = -5; @@ -298,7 +315,7 @@ void parse_ptree(boost::property_tree::ptree& pt, ptree pt_subchans = pt.get_child("subchannels"); for (ptree::iterator it = pt_subchans.begin(); it != pt_subchans.end(); ++it) { string subchanuid = it->first; - dabSubchannel* subchan = new dabSubchannel(); + dabSubchannel* subchan = new dabSubchannel(subchanuid); ensemble->subchannels.push_back(subchan); @@ -331,7 +348,7 @@ void parse_ptree(boost::property_tree::ptree& pt, string componentuid = it->first; ptree pt_comp = it->second; - DabService* service; + shared_ptr service; try { // Those two uids serve as foreign keys to select the service+subchannel string service_uid = pt_comp.get("service"); diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 55de493..413e034 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -116,6 +116,17 @@ void DabMultiplexer::prepare_watermark() m_watermarkPos = 0; } +void DabMultiplexer::update_config(boost::property_tree::ptree pt) +{ + ensemble->services.clear(); + ensemble->components.clear(); + ensemble->subchannels.clear(); + + m_pt = pt; + + prepare(); +} + // Run a set of checks on the configuration void DabMultiplexer::prepare() { @@ -369,7 +380,7 @@ void DabMultiplexer::mux_frame(std::vector >& outpu unsigned char etiFrame[6144]; unsigned short index = 0; - vector::iterator service; + vector >::iterator service; vector::iterator component; vector::iterator subchannel; diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index ebedc82..e9843fc 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -46,6 +46,7 @@ #include "Eti.h" #include #include +#include #include #include #include @@ -74,6 +75,8 @@ class DabMultiplexer { void print_info(void); + void update_config(boost::property_tree::ptree pt); + private: void prepare_watermark(void); void prepare_subchannels(void); @@ -101,9 +104,9 @@ class DabMultiplexer { unsigned int insertFIG; unsigned int rotateFIB; - std::vector::iterator serviceProgFIG0_2; - std::vector::iterator serviceDataFIG0_2; - std::vector::iterator serviceFIG0_17; + std::vector >::iterator serviceProgFIG0_2; + std::vector >::iterator serviceDataFIG0_2; + std::vector >::iterator serviceFIG0_17; std::vector::iterator componentProgFIG0_8; std::vector::iterator componentDataFIG0_8; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index e7da965..560cf65 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -481,9 +481,11 @@ int main(int argc, char *argv[]) else if (mgmt_server.request_pending()) { mgmt_server.update_ptree(pt); } - /* else if (mgmt_server.retrieve_new_ptree(pt)) { - } - */ + else if (mgmt_server.retrieve_new_ptree(pt)) { + etiLog.level(warn) << + "Detected configuration change"; + mux.update_config(pt); + } } } etiLog.level(info) << "Goodbye"; diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index fc0a43e..9687278 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -197,140 +197,107 @@ void ManagementServer::restart_thread(long) void ManagementServer::serverThread() { - using boost::asio::ip::tcp; - m_running = true; m_fault = false; - while (m_running) { - m_io_service.reset(); - - tcp::acceptor acceptor(m_io_service, tcp::endpoint( - boost::asio::ip::address::from_string("127.0.0.1"), - m_listenport) ); + std::stringstream bind_addr; + bind_addr << "tcp://127.0.0.1:" << m_listenport; + m_zmq_sock.bind(bind_addr.str().c_str()); + while (m_running) { + zmq::message_t zmq_message; + m_zmq_sock.recv(&zmq_message); - // Add a job to start accepting connections. - boost::shared_ptr socket( - new tcp::socket(acceptor.get_io_service())); - - // Add an accept call to the service. This will prevent io_service::run() - // from returning. - etiLog.level(warn) << "MGMT: Waiting on connection"; - acceptor.async_accept(*socket, - boost::bind(&ManagementServer::handle_accept, - this, - boost::asio::placeholders::error, - socket, - boost::ref(acceptor))); - // Process event loop. - m_io_service.run(); + handle_message(zmq_message); } m_fault = true; } -void ManagementServer::handle_accept( - const boost::system::error_code& boost_error, - boost::shared_ptr< boost::asio::ip::tcp::socket > socket, - boost::asio::ip::tcp::acceptor& acceptor) +bool ManagementServer::handle_setptree( + zmq::message_t& zmq_message, std::stringstream& answer) { - if (boost_error) - { - etiLog.level(error) << "MGMT: Error accepting connection"; - return; - } - - std::stringstream welcome_msg; - - welcome_msg << - "{ \"service\": \"" << - PACKAGE_NAME << " " << -#if defined(GITVERSION) - GITVERSION << -#else - PACKAGE_VERSION << -#endif - " MGMT Server\" }\n"; - try { - etiLog.level(info) << "RC: Accepted"; + if (zmq_message.more()) { + zmq::message_t zmq_new_ptree; + m_zmq_sock.recv(&zmq_new_ptree); + std::string new_ptree( + (char*)zmq_new_ptree.data(), zmq_new_ptree.size() + ); - boost::system::error_code ignored_error; + etiLog.level(info) << "Received ptree " << new_ptree; - boost::asio::write(*socket, boost::asio::buffer(welcome_msg.str()), - boost::asio::transfer_all(), - ignored_error); + boost::unique_lock lock(m_configmutex); + m_pt.clear(); - boost::asio::streambuf buffer; - size_t length = - boost::asio::read_until(*socket, buffer, "\n", ignored_error); + std::stringstream json_stream; + json_stream << new_ptree; + boost::property_tree::json_parser::read_json(json_stream, m_pt); - std::string in_message; - std::istream str(&buffer); - std::getline(str, in_message); + m_retrieve_pending = true; + answer << "OK"; - if (in_message == "config") { - std::string json = getStatConfigJSON(); - boost::asio::write(*socket, boost::asio::buffer(json), - boost::asio::transfer_all(), - ignored_error); + return true; } - else if (in_message == "values") { - std::string json = getValuesJSON(); - boost::asio::write(*socket, boost::asio::buffer(json), - boost::asio::transfer_all(), - ignored_error); + else { + etiLog.level(error) << + "MGMT: setptree command is missing data."; } - else if (in_message == "state") { - std::string json = getStateJSON(); + } + catch (std::exception& e) { + etiLog.level(error) << + "MGMT: setptree error." << e.what(); + } + return false; +} - boost::asio::write(*socket, boost::asio::buffer(json), - boost::asio::transfer_all(), - ignored_error); - } - else if (in_message == "setptree") { - boost::asio::streambuf jsonbuffer; - length = boost::asio::read_until( - *socket, jsonbuffer, "\n", ignored_error); +void ManagementServer::handle_message(zmq::message_t& zmq_message) +{ + std::stringstream answer; + std::string data((char*)zmq_message.data(), zmq_message.size()); - if (length > 0) { - boost::unique_lock lock(m_configmutex); - m_pt.clear(); + try { + etiLog.level(info) << "RC: Accepted"; - std::istream json_stream(&jsonbuffer); - boost::property_tree::json_parser::read_json(json_stream, m_pt); - } - else if (length == 0) { - etiLog.level(warn) << - "MGMT: No JSON data received"; - } - else { - etiLog.level(error) << - "MGMT: Error JSON reception"; - } + if (data == "info") { + answer << + "{ \"service\": \"" << + PACKAGE_NAME << " " << +#if defined(GITVERSION) + GITVERSION << +#else + PACKAGE_VERSION << +#endif + " MGMT Server\" }\n"; + } + else if (data == "config") { + answer << getStatConfigJSON(); + } + else if (data == "values") { + answer << getValuesJSON(); } - else if (in_message == "getptree") { + else if (data == "state") { + answer << getStateJSON(); + } + else if (data == "setptree") { + handle_setptree(zmq_message, answer); + } + else if (data == "getptree") { boost::unique_lock lock(m_configmutex); m_pending = true; while (m_pending && !m_retrieve_pending) { m_condition.wait(lock); } - std::stringstream ss; - boost::property_tree::json_parser::write_json(ss, m_pt); - - boost::asio::write(*socket, boost::asio::buffer(ss.str()), - boost::asio::transfer_all(), - ignored_error); + boost::property_tree::json_parser::write_json(answer, m_pt); } else { - std::string invcmd("Invalid command\n"); - boost::asio::write(*socket, boost::asio::buffer(invcmd), - boost::asio::transfer_all(), - ignored_error); + answer << "Invalid command"; } + + std::string answerstr(answer.str()); + m_zmq_sock.send(answerstr.c_str(), answerstr.size()); } catch (std::exception& e) { etiLog.level(error) << diff --git a/src/ManagementServer.h b/src/ManagementServer.h index cea692f..836dee4 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -7,7 +7,7 @@ http://www.opendigitalradio.org - A TCP Socket server that serves state information and statistics for + A server that serves state information and statistics for monitoring purposes, and also serves the internal configuration property tree. @@ -15,7 +15,7 @@ http://munin-monitoring.org/ but is not specific to it. - The TCP Server responds in JSON, and accepts the commands: + The responds in JSON, and accepts the commands: - config - values Inspired by the munin equivalent @@ -27,6 +27,7 @@ Returns the internal boost property_tree that contains the multiplexer configuration DB. + The server is using REQ/REP ZeroMQ sockets. */ /* This file is part of ODR-DabMux. @@ -52,11 +53,11 @@ # include "config.h" #endif +#include "zmq.hpp" #include #include #include #include -#include #include #include #include @@ -314,7 +315,8 @@ class ManagementServer { public: ManagementServer() : - m_io_service(), + m_zmq_context(), + m_zmq_sock(m_zmq_context, ZMQ_REP), m_running(false), m_fault(false), m_pending(false) { } @@ -325,7 +327,7 @@ class ManagementServer m_fault = false; m_pending = false; - m_io_service.stop(); + // TODO notify m_thread.join(); } @@ -359,23 +361,21 @@ class ManagementServer void restart(void); private: - boost::asio::io_service m_io_service; - void restart_thread(long); - /******* TCP Socket Server ******/ + /******* Server ******/ + zmq::context_t m_zmq_context; + zmq::socket_t m_zmq_sock; + // no copying (because of the thread) ManagementServer(const ManagementServer& other); void serverThread(void); + void handle_message(zmq::message_t& zmq_message); + bool handle_setptree(zmq::message_t& zmq_message, std::stringstream& answer); bool isInputRegistered(std::string& id); - void handle_accept( - const boost::system::error_code& boost_error, - boost::shared_ptr< boost::asio::ip::tcp::socket > socket, - boost::asio::ip::tcp::acceptor& acceptor); - int m_listenport; // serverThread runs in a separate thread @@ -418,7 +418,7 @@ class ManagementServer boost::property_tree::ptree m_pt; }; -// If necessary construct the management server singleton and return +// If necessary construct the management server singleton and return // a reference to it ManagementServer& get_mgmt_server(); diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp index 0a52f0b..4bd82cd 100644 --- a/src/MuxElements.cpp +++ b/src/MuxElements.cpp @@ -191,19 +191,19 @@ vector::iterator getComponent( return getComponent(components, serviceId, components.end()); } -vector::iterator getService( +std::vector >::iterator getService( DabComponent* component, - vector& services) + std::vector >& services) { - vector::iterator service; - - for (service = services.begin(); service != services.end(); ++service) { - if ((*service)->id == component->serviceId) { - break; + size_t i = 0; + for (auto service : services) { + if (service->id == component->serviceId) { + return services.begin() + i; } + i++; } - return service; + throw std::runtime_error("Service not included in any component"); } bool DabComponent::isPacketComponent(vector& subchannels) diff --git a/src/MuxElements.h b/src/MuxElements.h index f4def85..ada7ce3 100644 --- a/src/MuxElements.h +++ b/src/MuxElements.h @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2014, 2015 Matthias P. Braendli, matthias.braendli@mpb.li This file defines all data structures used in DabMux to represent @@ -29,6 +29,7 @@ #define _MUX_ELEMENTS #include +#include #include #include #include @@ -130,7 +131,7 @@ class dabEnsemble : public RemoteControllable { int international_table; - std::vector services; + std::vector > services; std::vector components; std::vector subchannels; }; @@ -178,7 +179,16 @@ enum dab_subchannel_type_t { Packet = 3 }; -struct dabSubchannel { +class dabSubchannel +{ +public: + dabSubchannel(std::string& uid) : + uid(uid) + { + } + + std::string uid; + std::string inputUri; DabInputBase* input; unsigned char id; @@ -232,12 +242,15 @@ struct dabPacketComponent { class DabComponent : public RemoteControllable { public: - DabComponent(std::string uid) : - RemoteControllable(uid) + DabComponent(std::string& uid) : + RemoteControllable(uid), + uid(uid) { RC_ADD_PARAMETER(label, "Label and shortlabel [label,short]"); } + std::string uid; + DabLabel label; uint32_t serviceId; uint8_t subchId; @@ -270,12 +283,15 @@ class DabComponent : public RemoteControllable class DabService : public RemoteControllable { public: - DabService(std::string uid) : - RemoteControllable(uid) + DabService(std::string& uid) : + RemoteControllable(uid), + uid(uid) { RC_ADD_PARAMETER(label, "Label and shortlabel [label,short]"); } + std::string uid; + uint32_t id; unsigned char pty; unsigned char language; @@ -312,9 +328,9 @@ std::vector::iterator getComponent( std::vector& components, uint32_t serviceId); -std::vector::iterator getService( +std::vector >::iterator getService( DabComponent* component, - std::vector& services); + std::vector >& services); unsigned short getSizeCu(dabSubchannel* subchannel); @@ -324,5 +340,5 @@ unsigned short getSizeByte(dabSubchannel* subchannel); unsigned short getSizeWord(dabSubchannel* subchannel); - #endif + diff --git a/src/utils.cpp b/src/utils.cpp index 50591ae..42937c1 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -366,28 +366,27 @@ void printOutputs(vector >& outputs) } } -void printServices(vector& services) +void printServices(const vector >& services) { - vector::const_iterator current; int index = 0; - for (current = services.begin(); current != services.end(); ++current) { + for (auto service : services) { - etiLog.level(info) << "Service " << (*current)->get_rc_name(); + etiLog.level(info) << "Service " << service->get_rc_name(); etiLog.level(info) << " label: " << - (*current)->label.long_label(); + service->label.long_label(); etiLog.level(info) << " short label: " << - (*current)->label.short_label(); + service->label.short_label(); - etiLog.log(info, " (0x%x)", (*current)->label.flag()); - etiLog.log(info, " id: 0x%lx (%lu)", (*current)->id, - (*current)->id); + etiLog.log(info, " (0x%x)", service->label.flag()); + etiLog.log(info, " id: 0x%lx (%lu)", service->id, + service->id); - etiLog.log(info, " pty: 0x%x (%u)", (*current)->pty, - (*current)->pty); + etiLog.log(info, " pty: 0x%x (%u)", service->pty, + service->pty); etiLog.log(info, " language: 0x%x (%u)", - (*current)->language, (*current)->language); + service->language, service->language); ++index; } } diff --git a/src/utils.h b/src/utils.h index 8197210..1756adf 100644 --- a/src/utils.h +++ b/src/utils.h @@ -52,7 +52,7 @@ void printUsageConfigfile(char *name, FILE* out = stderr); * resp. subchannels*/ void printOutputs(std::vector >& outputs); -void printServices(std::vector& services); +void printServices(std::vector >& services); void printComponents(std::vector& components); @@ -64,3 +64,4 @@ void printEnsemble(const boost::shared_ptr ensemble); /* Print detailed component information */ void printComponent(DabComponent* component); #endif + -- cgit v1.2.3