diff options
-rwxr-xr-x | eti_tcp.py | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/eti_tcp.py b/eti_tcp.py new file mode 100755 index 0000000..9cb9a22 --- /dev/null +++ b/eti_tcp.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python2 +# +# A small program that transmits stdin to several TCP connections +# +# Known to work with TM 2. Be careful when using TM 1 or 4: modulator might +# lose frame phase synchronisation when frames are dropped. Setting +# PACKET_SIZE to 4*6144 should avoid this problem. +# +# 2012, mpb + +from socket import * +import threading +import Queue +import sys +import os +import time + +PACKET_SIZE=6144 +QUEUE_PACKET_CAPACITY=100 # represents 2.4 seconds of ETI data + +class Connection(object): + def __init__(self, conn, addr): + print("Got new connection from {0}".format(addr)) + self.addr = addr + self.q = Queue.Queue(PACKET_SIZE*QUEUE_PACKET_CAPACITY) + self.ch = ConnectionHandler(conn, self.q) + self.ch.start() + + def send(self, data): + if self.ch.running: + try: + self.q.put_nowait(data) + except Queue.Full: + pass + return True + else: + return False + + def join(self): + self.ch.join() + + def terminate(self): + self.ch.running = False + self.ch.close() + +class ConnectionHandler(threading.Thread): + def __init__(self, sock, queue): + self.sock = sock + self.queue = queue + self.running = True + threading.Thread.__init__(self) + + def run(self): + print("Server for {0} created".format(self.sock.getpeername())) + + # Disable Nagle's algorithm, s.t. the TCP stack does not + # put together small send()s + self.sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1) + + while self.running: + data = self.queue.get() + try: + self.sock.sendall(data) + except: + self.running = False + + self.sock.close() + + def close(self): + self.sock.close() + +import fcntl +class DataSender(threading.Thread): + + def __init__(self, connections): + print("DS starting.") + + if True: + # make stdin a non-blocking file + fd = sys.stdin.fileno() + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + self.running = True + self.connections = connections + threading.Thread.__init__(self) + + def run(self): + while self.running: + try: + data = sys.stdin.read(PACKET_SIZE) + except KeyboardInterrupt: + break + except: + time.sleep(0.005) + continue + + removeconns = [] + + for c in self.connections: + if not c.send(data): + removeconns.append(c) + #print("DS: Put {0} bytes into {1} connections".format(len(data), len(self.connections))) + + for c in removeconns: + c.terminate() + self.connections.remove(c) + if len(removeconns) != 0: + print("DS: Removed {0} connections".format(len(removeconns))) + del removeconns + + print("DS: Bye") + +def listener(port): + print("eti_tcp.py [{0}] starting...".format(os.getpid())) + ADDR = ("", port) + + serv = socket(AF_INET, SOCK_STREAM) + + # Can reuse a socket that is in TIME_WAIT + serv.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + + serv.bind((ADDR)) + serv.listen(3) + + connections = [] + + ds = DataSender(connections) + ds.start() + + while True: + try: + print("Main: accepting on port {0}...".format(port)) + sock, addr = serv.accept() + connections.append(Connection(sock, addr)) + except KeyboardInterrupt: + print("Interrupted") + break + + print("Terminating {0} connections".format(len(connections))) + for c in connections: + c.terminate() + + print("Joining connections") + for c in connections: + c.join() + + print("Send Ctrl-D to close stdin") + + ds.running = False + ds.join() + +def usage(): + print("""Usage: + {0} port + + port: specifies on which TCP port to listen for incoming connections""".format(sys.argv[0])) + +if len(sys.argv) != 2: + usage() + sys.exit(1) +else: + port = 0 + try: + port = int(sys.argv[1]) + except: + usage() + sys.exit(1) + + listener(port) |