#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (C) 2019 # Matthias P. Braendli, matthias.braendli@mpb.li # # http://www.opendigitalradio.org # # This file is part of ODR-DabMux. # # ODR-DabMux is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # ODR-DabMux is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with ODR-DabMux. If not, see . import zmq import json class RCParameter(object): def __init__(self, param, value): self.param = param self.value = value class RCModule(object): """Container object for RC module""" def __init__(self, name): self.name = name self.parameters = [] class MuxRemoteControl(object): """Interact with ODR-DabMux using the ZMQ RC""" def __init__(self, mux_host, mux_port=12722): self._host = mux_host self._port = mux_port self._ctx = zmq.Context() self.module_list = [] def zRead(self, message_parts): sock = zmq.Socket(self._ctx, zmq.REQ) sock.setsockopt(zmq.LINGER, 0) sock.connect("tcp://{}:{}".format(self._host, self._port)) for i, part in enumerate(message_parts): if i == len(message_parts) - 1: f = 0 else: f = zmq.SNDMORE print("Send {} {}".format(i, part)) sock.send(part.encode(), flags=f) print("Poll") # use poll for timeouts: poller = zmq.Poller() poller.register(sock, zmq.POLLIN) if poller.poll(5*1000): # 5s timeout in milliseconds recv = sock.recv_multipart() print("RX {}".format(recv)) sock.close() return recv else: raise IOError("Timeout processing ZMQ request") def load(self): """Load the list of RC modules""" module_jsons = self.zRead(['list']) self.module_list = [] for module_json in module_jsons: module = json.loads(module_json) name = module['name'] mod = RCModule(name) module_params = self.zRead(['show', name]) print("m_p", module_params) for param in module_params: p, v = param.split(b': ') mod.parameters.append(RCParameter(p, v)) self.module_list.append(mod) def get_modules(self): return self.module_list def get_param_value(self, module, param): value = self.zRead(['get', module, param]) if value[0] == b'fail': raise ValueError("Error getting param: {}".format(value[1])) else: return value[0] def set_param_value(self, module, param, value): ret = self.zRead(['set', module, param, value]) if ret[0] == b'fail': raise ValueError("Error getting param: {}".format(ret[1]))