/*! * This is an implementation for the zmq ctrl API of the odr-dabmod. * * Copyright (c) 2015 by Jörgen Scott (jorgen.scott@paneda.se) * * ODR-DabMod is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * ODR-DabMod is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ODR-DabMod. If not, see . * * \code * #include "OdrModCtrl.hpp" * #include * ... * zmq::context_t ctx; * std::string error; * COdrModCtrl *pCtrl = new COdrModCtrl(&context, // zmq context * "tcp://127.0.0.1:9400", // zmq endpoint * 1000); // timeout in milliseconds * if (pCtrl->SetTxGain(50, error)) * std::cout << "Tx gain set to 50" << std::endl; * else * std::cout << "An error occured: " << error << std::endl; * delete pCtrl; // destructor will close zmq socket * * \endcode **/ #include "OdrModCtrl.hpp" #include #define MOD_GAIN "gain" #define MOD_UHD "uhd" #define PARAM_DIG_GAIN "digital" #define PARAM_TX_GAIN "txgain" #define PARAM_FREQ "freq" #define PARAM_MUTE "muting" #define PARAM_STAT_DELAY "staticdelay" COdrModCtrl::COdrModCtrl(zmq::context_t *pContext, std::string odrEndpoint, unsigned int timeoutMs) { m_pContext = pContext; m_odrEndpoint = odrEndpoint; m_timeoutMs = (uint32_t) timeoutMs; m_pReqSocket = NULL; } COdrModCtrl::~COdrModCtrl() { if (m_pReqSocket != NULL) { m_pReqSocket->close(); delete m_pReqSocket; } } //// public get methods ///////////////////////////////////////////////////////// bool COdrModCtrl::GetDigitalGain(double &gain, std::string &error) { return DoGet(MOD_GAIN, PARAM_DIG_GAIN, gain, error); } bool COdrModCtrl::GetTxGain(double &gain, std::string &error) { return DoGet(MOD_UHD, PARAM_TX_GAIN, gain, error); } bool COdrModCtrl::GetTxFrequency(double &freqHz, std::string &error) { return DoGet(MOD_UHD, PARAM_FREQ, freqHz, error); } bool COdrModCtrl::GetMuting(bool &mute, std::string &error) { return DoGet(MOD_UHD, PARAM_MUTE, (uint32_t&) mute, error); } bool COdrModCtrl::GetStaticDelay(uint32_t &delayUs, std::string &error) { return DoGet(MOD_UHD, PARAM_STAT_DELAY, delayUs, error); } //// public set methods ///////////////////////////////////////////////////////// bool COdrModCtrl::Ping() { std::string error; if (m_pReqSocket == NULL) { m_pReqSocket = new zmq::socket_t(*m_pContext, ZMQ_REQ); if (!ConnectSocket(m_pReqSocket, m_odrEndpoint, error)) return false; } std::vector msg; msg.push_back("ping"); // send the message if (!SendMessage(m_pReqSocket, msg, error)) { // destroy the socket according to the "Lazy Pirate Pattern" in // the zmq guide m_pReqSocket->close(); delete m_pReqSocket; m_pReqSocket = NULL; return false; } // wait for reply if (!RecvAll(m_pReqSocket, msg, m_timeoutMs, error)) return false; return true; } bool COdrModCtrl::SetDigitalGain(const double gain, std::string &error) { return DoSet(MOD_GAIN, PARAM_DIG_GAIN, gain, error); } bool COdrModCtrl::SetTxGain(const double gain, std::string &error) { return DoSet(MOD_UHD, PARAM_TX_GAIN, gain, error); } bool COdrModCtrl::SetTxFrequency(const double freqHz, std::string &error) { return DoSet(MOD_UHD, PARAM_FREQ, freqHz, error); } bool COdrModCtrl::SetMuting(const bool mute, std::string &error) { return DoSet(MOD_UHD, PARAM_MUTE, mute, error); } bool COdrModCtrl::SetStaticDelay(const int32_t delayUs, std::string &error) { return DoSet(MOD_UHD, PARAM_STAT_DELAY, delayUs, error); } //// private methods //////////////////////////////////////////////////////////// template bool COdrModCtrl::DoSet(const std::string module, const std::string parameter, const Type value, std::string &error) { if (m_pReqSocket == NULL) { m_pReqSocket = new zmq::socket_t(*m_pContext, ZMQ_REQ); if (!ConnectSocket(m_pReqSocket, m_odrEndpoint, error)) return false; } std::vector msg; msg.push_back("set"); msg.push_back(module); msg.push_back(parameter); std::stringstream ss; ss << value; msg.push_back(ss.str()); // send the message if (!SendMessage(m_pReqSocket, msg, error)) { // destroy the socket according to the "Lazy Pirate Pattern" in // the zmq guide m_pReqSocket->close(); delete m_pReqSocket; m_pReqSocket = NULL; return false; } // wait for reply if (!RecvAll(m_pReqSocket, msg, m_timeoutMs, error)) return false; return ParseSetReply(msg, error); } bool COdrModCtrl::ParseSetReply(const std::vector &msg, std::string &error) { error = ""; if (msg.size() < 1) error = "Bad reply format"; else if (msg.size() == 1 && msg[0] == "ok") return true; else if (msg.size() == 2 && msg[0] == "fail") { error = msg[1]; return false; } else { error = "Bad reply format"; return false; } } template bool COdrModCtrl::DoGet(const std::string module, const std::string parameter, Type &value, std::string &error) { if (m_pReqSocket == NULL) { m_pReqSocket = new zmq::socket_t(*m_pContext, ZMQ_REQ); if (!ConnectSocket(m_pReqSocket, m_odrEndpoint, error)) return false; } std::vector msg; msg.push_back("get"); msg.push_back(module); msg.push_back(parameter); // send the message if (!SendMessage(m_pReqSocket, msg, error)) { // destroy the socket according to the "Lazy Pirate Pattern" // in the zmq guide m_pReqSocket->close(); delete m_pReqSocket; m_pReqSocket = NULL; return false; } // wait for reply if (!RecvAll(m_pReqSocket, msg, m_timeoutMs, error)) return false; return ParseGetReply(msg, value, error); } template bool COdrModCtrl::ParseGetReply(const std::vector &msg, Type &value, std::string &error) { error = ""; if (msg.size() < 1) error = "Bad reply format"; else if (msg.size() == 1) { std::stringstream ss(msg[0]); ss >> value; return true; } else if (msg.size() == 2 && msg[0] == "fail") { error = msg[1]; return false; } else { error = "Bad reply format"; return false; } } bool COdrModCtrl::ConnectSocket(zmq::socket_t *pSocket, const std::string endpoint, std::string &error) { error = ""; try { int hwm = 1; int linger = 0; pSocket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); pSocket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); pSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); pSocket->connect(endpoint.c_str()); return true; } catch(zmq::error_t &ex) { error = "Failed to connect: " + endpoint + std::string(". ZMQ: " + std::string(ex.what())); return false; } } bool COdrModCtrl::SendMessage(zmq::socket_t* pSocket, const std::vector &message, std::string &error) { error = ""; try { std::vector::size_type i = 0; for ( ; i < message.size() - 1; i++) { zmq::message_t zmqMsg(message[i].length()); memcpy ((void*) zmqMsg.data(), message[i].data(), message[i].length()); pSocket->send(zmqMsg, ZMQ_SNDMORE); } zmq::message_t zmqMsg(message[i].length()); memcpy ((void*) zmqMsg.data(), message[i].data(), message[i].length()); pSocket->send(zmqMsg, 0); return true; } catch(zmq::error_t &ex) { error = "ZMQ send error: " + std::string(ex.what()); return false; } } bool COdrModCtrl::RecvAll(zmq::socket_t* pSocket, std::vector &message, unsigned int timeoutMs, std::string &error) { error = ""; message.clear(); int more = -1; size_t more_size = sizeof(more); zmq::pollitem_t pollItems[] = { {*pSocket, 0, ZMQ_POLLIN, 0} }; zmq::poll(&pollItems[0], 1, timeoutMs); while (more != 0) { if (pollItems[0].revents & ZMQ_POLLIN) { zmq::message_t msg; pSocket->recv(&msg); message.push_back(std::string((char*)msg.data(), msg.size())); pSocket->getsockopt(ZMQ_RCVMORE, &more, &more_size); } else { error = "Receive timeout"; return false; } } return true; }