aboutsummaryrefslogtreecommitdiffstats
path: root/src/tcp_sync.py
blob: 6a2e6192935406215a5964df0b23c5cdd530751f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Tcp client for synchronous uhd message tcp port"""

import threading
import Queue
import time
import socket
import struct

class _TcpSyncClient(threading.Thread):
    """Thead for message polling"""
    queue = Queue.Queue()
    q_quit = Queue.Queue()

    ip_address = None
    port = None

    def __init__(self, ip_address, port, packet_size, packet_type):
        super(_TcpSyncClient, self).__init__()
        self.ip_address = ip_address
        self.port = port
        self.packet_size = packet_size
        self.packet_type = packet_type

    def __exit__(self):
        self.stop()

    def run(self):
        """connect and poll messages to queue"""

        #Establish connection
        sock = None
        print("Connecting to synchronous uhd message tcp port " + str(self.port))
        while self.q_quit.empty():
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((self.ip_address, self.port))
                break
            except socket.error:
                print("connecting to synchronous uhd message tcp port " + str(self.port))
                #traceback.print_exc()
                sock.close()
                time.sleep(0.5)
        print("Connected to synchronous uhd message tcp port " + str(self.port))

        #Read messages
        sock.settimeout(None)
        while self.q_quit.empty():
            try:
                s = ""

                #concatenate to one package
                while self.q_quit.empty():
                    s += sock.recv(self.packet_size)
                    if (len(s)) == self.packet_size:
                        break
                    if (len(s)) > self.packet_size:
                        print("received wrong size of length " + str(len(s)))
                        time.sleep(0.01)
                        return -1

                res_tuple = struct.unpack(
                        self.packet_type,
                        s)
                self.queue.put(res_tuple)
            except socket.timeout:
                self.stop()
                traceback.print_exc()
                pass

        sock.close()

    def stop(self):
        """stop thread"""
        print("stop tcp_sync uhd message tcp thread")
        self.q_quit.put("end")


class UhdSyncMsg(object):
    """Creates a thread to connect to the synchronous uhd messages tcp port"""

    def __init__(self, ip_address = "127.0.0.1", port = 47009, packet_size = 3, packet_type = "fff"):
        self.tcpa = _TcpSyncClient(ip_address, port, packet_size, packet_type)
        self.tcpa.start()

    def __exit__(self):
        self.tcpa.stop()

    def stop(self):
        """stop tcp thread"""
        self.tcpa.stop()

    def get_msgs(self, num):
        """get received messages as string of integer"""
        out = []
        while len(out) < num:
            out.append(self.tcpa.queue.get())
        return out

    def get_res(self):
        """get received messages as string of integer"""
        out = []
        while not self.tcpa.queue.empty():
            out.append(self.tcpa.queue.get())
        return out

    def has_msg(self):
        """Checks if one or more messages were received and empties the message queue"""
        return self.get_res() != ""