diff options
Diffstat (limited to 'python/gui/zmqrc.py')
-rw-r--r-- | python/gui/zmqrc.py | 84 |
1 files changed, 0 insertions, 84 deletions
diff --git a/python/gui/zmqrc.py b/python/gui/zmqrc.py deleted file mode 100644 index 3897d7a..0000000 --- a/python/gui/zmqrc.py +++ /dev/null @@ -1,84 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Copyright (C) 2018 -# Matthias P. Braendli, matthias.braendli@mpb.li -# -# http://www.opendigitalradio.org -# -# This file is part of ODR-DabMod. -# -# ODR-DabMod is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# ODR-DabMod is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. -import zmq -import json - -class ModRemoteControl(object): - """Interact with ODR-DabMod using the ZMQ RC""" - def __init__(self, mod_host, mod_port=9400): - self._host = mod_host - self._port = mod_port - self._ctx = zmq.Context() - - def _read(self, message_parts): - sock = zmq.Socket(self._ctx, zmq.REQ) - sock.setsockopt(zmq.LINGER, 0) - sock.connect("tcp://{}:{}".format(self._host, self._port)) - - for i, part in enumerate(message_parts): - if i == len(message_parts) - 1: - f = 0 - else: - f = zmq.SNDMORE - sock.send(part.encode(), flags=f) - - # use poll for timeouts: - poller = zmq.Poller() - poller.register(sock, zmq.POLLIN) - if poller.poll(5*1000): # 5s timeout in milliseconds - recv = sock.recv_multipart() - sock.close() - return [r.decode() for r in recv] - else: - raise IOError("Timeout processing ZMQ request") - - def get_modules(self): - modules = {} - - for mod in [json.loads(j) for j in self._read(['list'])]: - params = {} - pv_list = self._read(['show', mod['name']]) - - for pv in pv_list: - p, _, v = pv.partition(": ") - params[p] = {"value": v.strip()} - - for p in mod['params']: - if p in params: - params[p]["help"] = mod['params'][p] - modules[mod['name']] = params - - return modules - - def get_param_value(self, module, param): - value = self._read(['get', module, param]) - if value[0] == 'fail': - raise ValueError("Error getting param: {}".format(value[1])) - else: - return value[0] - - def set_param_value(self, module, param, value): - ret = self._read(['set', module, param, value]) - if ret[0] == 'fail': - raise ValueError("Error setting param: {}".format(ret[1])) - |