diff options
-rwxr-xr-x | eti_tcp.py | 57 |
1 files changed, 46 insertions, 11 deletions
@@ -2,11 +2,33 @@ # # A small program that transmits stdin to several TCP connections # -# Known to work with TM 2. Be careful when using TM 1 or 4: modulator might -# lose frame phase synchronisation when frames are dropped. Setting -# PACKET_SIZE to 4*6144 should avoid this problem. +# We set PACKET_SIZE to 4*6144 to put four ETI frames together +# that will become one DAB frame in TM I. # -# 2012, mpb +# Copyright (c) 2015, Matthias P. Braendli +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + from socket import * import threading @@ -31,7 +53,8 @@ class Connection(object): try: self.q.put_nowait(data) except Queue.Full: - pass + # Abort here, otherwise we send an incomplete stream ! + self.terminate() return True else: return False @@ -42,7 +65,7 @@ class Connection(object): def terminate(self): self.ch.running = False self.ch.close() - + class ConnectionHandler(threading.Thread): def __init__(self, sock, queue): self.sock = sock @@ -72,7 +95,7 @@ class ConnectionHandler(threading.Thread): import fcntl class DataSender(threading.Thread): - def __init__(self, connections): + def __init__(self, connections, lock): print("DS starting.") if True: @@ -83,6 +106,7 @@ class DataSender(threading.Thread): self.running = True self.connections = connections + self.lock = lock threading.Thread.__init__(self) def run(self): @@ -97,6 +121,7 @@ class DataSender(threading.Thread): removeconns = [] + self.lock.acquire() for c in self.connections: if not c.send(data): removeconns.append(c) @@ -108,35 +133,45 @@ class DataSender(threading.Thread): if len(removeconns) != 0: print("DS: Removed {0} connections".format(len(removeconns))) del removeconns + self.lock.release() print("DS: Bye") def listener(port): print("eti_tcp.py [{0}] starting...".format(os.getpid())) ADDR = ("", port) - - serv = socket(AF_INET, SOCK_STREAM) + + serv = socket(AF_INET, SOCK_STREAM) # Can reuse a socket that is in TIME_WAIT serv.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) - + serv.bind((ADDR)) serv.listen(3) connections = [] + connections_lock = threading.Lock() - ds = DataSender(connections) + ds = DataSender(connections, connections_lock) ds.start() while True: try: print("Main: accepting on port {0}...".format(port)) sock, addr = serv.accept() + + connections_lock.acquire() connections.append(Connection(sock, addr)) + connections_lock.release() except KeyboardInterrupt: print("Interrupted") break + try: + connections_lock.release() + except: + print("No need to release lock") + print("Terminating {0} connections".format(len(connections))) for c in connections: c.terminate() |