summaryrefslogtreecommitdiffstats
path: root/python/lib/zmqrc.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/lib/zmqrc.py')
-rw-r--r--python/lib/zmqrc.py84
1 files changed, 84 insertions, 0 deletions
diff --git a/python/lib/zmqrc.py b/python/lib/zmqrc.py
new file mode 100644
index 0000000..3897d7a
--- /dev/null
+++ b/python/lib/zmqrc.py
@@ -0,0 +1,84 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2018
+# Matthias P. Braendli, matthias.braendli@mpb.li
+#
+# http://www.opendigitalradio.org
+#
+# This file is part of ODR-DabMod.
+#
+# ODR-DabMod is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# ODR-DabMod is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+import zmq
+import json
+
+class ModRemoteControl(object):
+ """Interact with ODR-DabMod using the ZMQ RC"""
+ def __init__(self, mod_host, mod_port=9400):
+ self._host = mod_host
+ self._port = mod_port
+ self._ctx = zmq.Context()
+
+ def _read(self, message_parts):
+ sock = zmq.Socket(self._ctx, zmq.REQ)
+ sock.setsockopt(zmq.LINGER, 0)
+ sock.connect("tcp://{}:{}".format(self._host, self._port))
+
+ for i, part in enumerate(message_parts):
+ if i == len(message_parts) - 1:
+ f = 0
+ else:
+ f = zmq.SNDMORE
+ sock.send(part.encode(), flags=f)
+
+ # use poll for timeouts:
+ poller = zmq.Poller()
+ poller.register(sock, zmq.POLLIN)
+ if poller.poll(5*1000): # 5s timeout in milliseconds
+ recv = sock.recv_multipart()
+ sock.close()
+ return [r.decode() for r in recv]
+ else:
+ raise IOError("Timeout processing ZMQ request")
+
+ def get_modules(self):
+ modules = {}
+
+ for mod in [json.loads(j) for j in self._read(['list'])]:
+ params = {}
+ pv_list = self._read(['show', mod['name']])
+
+ for pv in pv_list:
+ p, _, v = pv.partition(": ")
+ params[p] = {"value": v.strip()}
+
+ for p in mod['params']:
+ if p in params:
+ params[p]["help"] = mod['params'][p]
+ modules[mod['name']] = params
+
+ return modules
+
+ def get_param_value(self, module, param):
+ value = self._read(['get', module, param])
+ if value[0] == 'fail':
+ raise ValueError("Error getting param: {}".format(value[1]))
+ else:
+ return value[0]
+
+ def set_param_value(self, module, param, value):
+ ret = self._read(['set', module, param, value])
+ if ret[0] == 'fail':
+ raise ValueError("Error setting param: {}".format(ret[1]))
+