aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xeti_tcp.py57
1 files changed, 46 insertions, 11 deletions
diff --git a/eti_tcp.py b/eti_tcp.py
index 9cb9a22..71199a3 100755
--- a/eti_tcp.py
+++ b/eti_tcp.py
@@ -2,11 +2,33 @@
#
# A small program that transmits stdin to several TCP connections
#
-# Known to work with TM 2. Be careful when using TM 1 or 4: modulator might
-# lose frame phase synchronisation when frames are dropped. Setting
-# PACKET_SIZE to 4*6144 should avoid this problem.
+# We set PACKET_SIZE to 4*6144 to put four ETI frames together
+# that will become one DAB frame in TM I.
#
-# 2012, mpb
+# Copyright (c) 2015, Matthias P. Braendli
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice, this
+# list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
from socket import *
import threading
@@ -31,7 +53,8 @@ class Connection(object):
try:
self.q.put_nowait(data)
except Queue.Full:
- pass
+ # Abort here, otherwise we send an incomplete stream !
+ self.terminate()
return True
else:
return False
@@ -42,7 +65,7 @@ class Connection(object):
def terminate(self):
self.ch.running = False
self.ch.close()
-
+
class ConnectionHandler(threading.Thread):
def __init__(self, sock, queue):
self.sock = sock
@@ -72,7 +95,7 @@ class ConnectionHandler(threading.Thread):
import fcntl
class DataSender(threading.Thread):
- def __init__(self, connections):
+ def __init__(self, connections, lock):
print("DS starting.")
if True:
@@ -83,6 +106,7 @@ class DataSender(threading.Thread):
self.running = True
self.connections = connections
+ self.lock = lock
threading.Thread.__init__(self)
def run(self):
@@ -97,6 +121,7 @@ class DataSender(threading.Thread):
removeconns = []
+ self.lock.acquire()
for c in self.connections:
if not c.send(data):
removeconns.append(c)
@@ -108,35 +133,45 @@ class DataSender(threading.Thread):
if len(removeconns) != 0:
print("DS: Removed {0} connections".format(len(removeconns)))
del removeconns
+ self.lock.release()
print("DS: Bye")
def listener(port):
print("eti_tcp.py [{0}] starting...".format(os.getpid()))
ADDR = ("", port)
-
- serv = socket(AF_INET, SOCK_STREAM)
+
+ serv = socket(AF_INET, SOCK_STREAM)
# Can reuse a socket that is in TIME_WAIT
serv.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
-
+
serv.bind((ADDR))
serv.listen(3)
connections = []
+ connections_lock = threading.Lock()
- ds = DataSender(connections)
+ ds = DataSender(connections, connections_lock)
ds.start()
while True:
try:
print("Main: accepting on port {0}...".format(port))
sock, addr = serv.accept()
+
+ connections_lock.acquire()
connections.append(Connection(sock, addr))
+ connections_lock.release()
except KeyboardInterrupt:
print("Interrupted")
break
+ try:
+ connections_lock.release()
+ except:
+ print("No need to release lock")
+
print("Terminating {0} connections".format(len(connections)))
for c in connections:
c.terminate()