aboutsummaryrefslogtreecommitdiffstats
path: root/eti_tcp.py
diff options
context:
space:
mode:
authorBram (morningbird) <bram@morningbird>2012-07-23 17:17:32 +0200
committerBram (morningbird) <bram@morningbird>2012-07-23 17:17:32 +0200
commitaf4380a24f515179dd1516c876c6544737ad8f27 (patch)
tree3f6c4563884c04b3be354903084909772fce78e4 /eti_tcp.py
parentcabc9bae0fa07ad5ead4c117dd97efff03f10aad (diff)
downloadmmbtools-aux-af4380a24f515179dd1516c876c6544737ad8f27.tar.gz
mmbtools-aux-af4380a24f515179dd1516c876c6544737ad8f27.tar.bz2
mmbtools-aux-af4380a24f515179dd1516c876c6544737ad8f27.zip
added missing eti_tcp.py
Diffstat (limited to 'eti_tcp.py')
-rwxr-xr-xeti_tcp.py170
1 files changed, 170 insertions, 0 deletions
diff --git a/eti_tcp.py b/eti_tcp.py
new file mode 100755
index 0000000..9cb9a22
--- /dev/null
+++ b/eti_tcp.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python2
+#
+# A small program that transmits stdin to several TCP connections
+#
+# Known to work with TM 2. Be careful when using TM 1 or 4: modulator might
+# lose frame phase synchronisation when frames are dropped. Setting
+# PACKET_SIZE to 4*6144 should avoid this problem.
+#
+# 2012, mpb
+
+from socket import *
+import threading
+import Queue
+import sys
+import os
+import time
+
+PACKET_SIZE=6144
+QUEUE_PACKET_CAPACITY=100 # represents 2.4 seconds of ETI data
+
+class Connection(object):
+ def __init__(self, conn, addr):
+ print("Got new connection from {0}".format(addr))
+ self.addr = addr
+ self.q = Queue.Queue(PACKET_SIZE*QUEUE_PACKET_CAPACITY)
+ self.ch = ConnectionHandler(conn, self.q)
+ self.ch.start()
+
+ def send(self, data):
+ if self.ch.running:
+ try:
+ self.q.put_nowait(data)
+ except Queue.Full:
+ pass
+ return True
+ else:
+ return False
+
+ def join(self):
+ self.ch.join()
+
+ def terminate(self):
+ self.ch.running = False
+ self.ch.close()
+
+class ConnectionHandler(threading.Thread):
+ def __init__(self, sock, queue):
+ self.sock = sock
+ self.queue = queue
+ self.running = True
+ threading.Thread.__init__(self)
+
+ def run(self):
+ print("Server for {0} created".format(self.sock.getpeername()))
+
+ # Disable Nagle's algorithm, s.t. the TCP stack does not
+ # put together small send()s
+ self.sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
+
+ while self.running:
+ data = self.queue.get()
+ try:
+ self.sock.sendall(data)
+ except:
+ self.running = False
+
+ self.sock.close()
+
+ def close(self):
+ self.sock.close()
+
+import fcntl
+class DataSender(threading.Thread):
+
+ def __init__(self, connections):
+ print("DS starting.")
+
+ if True:
+ # make stdin a non-blocking file
+ fd = sys.stdin.fileno()
+ fl = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
+
+ self.running = True
+ self.connections = connections
+ threading.Thread.__init__(self)
+
+ def run(self):
+ while self.running:
+ try:
+ data = sys.stdin.read(PACKET_SIZE)
+ except KeyboardInterrupt:
+ break
+ except:
+ time.sleep(0.005)
+ continue
+
+ removeconns = []
+
+ for c in self.connections:
+ if not c.send(data):
+ removeconns.append(c)
+ #print("DS: Put {0} bytes into {1} connections".format(len(data), len(self.connections)))
+
+ for c in removeconns:
+ c.terminate()
+ self.connections.remove(c)
+ if len(removeconns) != 0:
+ print("DS: Removed {0} connections".format(len(removeconns)))
+ del removeconns
+
+ print("DS: Bye")
+
+def listener(port):
+ print("eti_tcp.py [{0}] starting...".format(os.getpid()))
+ ADDR = ("", port)
+
+ serv = socket(AF_INET, SOCK_STREAM)
+
+ # Can reuse a socket that is in TIME_WAIT
+ serv.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
+
+ serv.bind((ADDR))
+ serv.listen(3)
+
+ connections = []
+
+ ds = DataSender(connections)
+ ds.start()
+
+ while True:
+ try:
+ print("Main: accepting on port {0}...".format(port))
+ sock, addr = serv.accept()
+ connections.append(Connection(sock, addr))
+ except KeyboardInterrupt:
+ print("Interrupted")
+ break
+
+ print("Terminating {0} connections".format(len(connections)))
+ for c in connections:
+ c.terminate()
+
+ print("Joining connections")
+ for c in connections:
+ c.join()
+
+ print("Send Ctrl-D to close stdin")
+
+ ds.running = False
+ ds.join()
+
+def usage():
+ print("""Usage:
+ {0} port
+
+ port: specifies on which TCP port to listen for incoming connections""".format(sys.argv[0]))
+
+if len(sys.argv) != 2:
+ usage()
+ sys.exit(1)
+else:
+ port = 0
+ try:
+ port = int(sys.argv[1])
+ except:
+ usage()
+ sys.exit(1)
+
+ listener(port)