summaryrefslogtreecommitdiffstats
path: root/gui/zmqrc.py
diff options
context:
space:
mode:
Diffstat (limited to 'gui/zmqrc.py')
-rw-r--r--gui/zmqrc.py84
1 files changed, 0 insertions, 84 deletions
diff --git a/gui/zmqrc.py b/gui/zmqrc.py
deleted file mode 100644
index 3897d7a..0000000
--- a/gui/zmqrc.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2018
-# Matthias P. Braendli, matthias.braendli@mpb.li
-#
-# http://www.opendigitalradio.org
-#
-# This file is part of ODR-DabMod.
-#
-# ODR-DabMod is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# ODR-DabMod is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
-import zmq
-import json
-
-class ModRemoteControl(object):
- """Interact with ODR-DabMod using the ZMQ RC"""
- def __init__(self, mod_host, mod_port=9400):
- self._host = mod_host
- self._port = mod_port
- self._ctx = zmq.Context()
-
- def _read(self, message_parts):
- sock = zmq.Socket(self._ctx, zmq.REQ)
- sock.setsockopt(zmq.LINGER, 0)
- sock.connect("tcp://{}:{}".format(self._host, self._port))
-
- for i, part in enumerate(message_parts):
- if i == len(message_parts) - 1:
- f = 0
- else:
- f = zmq.SNDMORE
- sock.send(part.encode(), flags=f)
-
- # use poll for timeouts:
- poller = zmq.Poller()
- poller.register(sock, zmq.POLLIN)
- if poller.poll(5*1000): # 5s timeout in milliseconds
- recv = sock.recv_multipart()
- sock.close()
- return [r.decode() for r in recv]
- else:
- raise IOError("Timeout processing ZMQ request")
-
- def get_modules(self):
- modules = {}
-
- for mod in [json.loads(j) for j in self._read(['list'])]:
- params = {}
- pv_list = self._read(['show', mod['name']])
-
- for pv in pv_list:
- p, _, v = pv.partition(": ")
- params[p] = {"value": v.strip()}
-
- for p in mod['params']:
- if p in params:
- params[p]["help"] = mod['params'][p]
- modules[mod['name']] = params
-
- return modules
-
- def get_param_value(self, module, param):
- value = self._read(['get', module, param])
- if value[0] == 'fail':
- raise ValueError("Error getting param: {}".format(value[1]))
- else:
- return value[0]
-
- def set_param_value(self, module, param, value):
- ret = self._read(['set', module, param, value])
- if ret[0] == 'fail':
- raise ValueError("Error setting param: {}".format(ret[1]))
-