aboutsummaryrefslogtreecommitdiffstats
path: root/eti-udp
diff options
context:
space:
mode:
Diffstat (limited to 'eti-udp')
-rw-r--r--eti-udp/README7
-rwxr-xr-xeti-udp/eti-udp-receiver.py102
-rwxr-xr-xeti-udp/eti-udp-sender.py63
-rw-r--r--eti-udp/etifec.py183
-rwxr-xr-xeti-udp/etireader.py111
-rw-r--r--eti-udp/ipdb.py11
6 files changed, 477 insertions, 0 deletions
diff --git a/eti-udp/README b/eti-udp/README
new file mode 100644
index 0000000..cffe8bc
--- /dev/null
+++ b/eti-udp/README
@@ -0,0 +1,7 @@
+This is an experimental UDP transport protocol
+for ETI with forward erasure correction for
+packet loss compensation.
+
+Requires:
+- a recent python
+- zfec
diff --git a/eti-udp/eti-udp-receiver.py b/eti-udp/eti-udp-receiver.py
new file mode 100755
index 0000000..aacaac9
--- /dev/null
+++ b/eti-udp/eti-udp-receiver.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+#
+# Read ETI data from standard input, in RAW, STREAMED or FRAMED format, and transmit
+# it over UDP to several receivers, using forward erasure correction and sequences
+# numbering to reorder packets.
+#
+# Copyright (c) 2012, Matthias P. Braendli <matthias@mpb.li>
+#
+# Permission to use, copy, modify, and/or distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
+# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
+# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+# PERFORMANCE OF THIS SOFTWARE.
+
+from etifec import *
+from socket import *
+import sys
+import os
+import time
+import zfec
+import struct
+import StringIO
+
+PORT = 12525
+
+def log(message):
+ sys.stderr.write(message)
+ sys.stderr.write("\n")
+
+def recieve_eti(output_fd):
+ k = 40
+ m = 60
+ etifec = ETI_Fec(k, m)
+
+ decodedeti = {}
+
+ blocks_grouped = {}
+ ignore_groups = []
+
+ start_delay = 10
+ curr_seqnr = -1
+
+
+ while True:
+ packet = sock.recv(4096)
+
+ # Add the received packet into the right place in the dictionary
+ sequence_nr, block_nr = struct.unpack("QI", packet[:12])
+ if sequence_nr in ignore_groups:
+ # We've already decoded this group. Drop.
+ #log("Dropped packet from already decoded group {0}".format(sequence_nr))
+ continue
+ #log("Registering packet sn {0} / bn {1}".format(sequence_nr, block_nr))
+ if sequence_nr not in blocks_grouped:
+ blocks_grouped[sequence_nr] = [(block_nr, packet[12:])]
+ else:
+ blocks_grouped[sequence_nr].append((block_nr, packet[12:]))
+
+ # If we have enough blocks for one group (with same sequence number), decode
+ # and add to completed list
+ for group_sn in blocks_grouped.keys()[:]:
+ if len(blocks_grouped[group_sn]) >= k:
+ block_numbers, blocks = zip(*blocks_grouped[group_sn])
+ decodedeti[group_sn] = etifec.decode_eti_group(group_sn, blocks, block_numbers)
+ del blocks_grouped[group_sn]
+ ignore_groups.append(group_sn)
+
+ # append to received stream in-order, handle incomplete groups
+ if start_delay > 0:
+ start_delay -= 1
+ if start_delay == 1:
+ curr_seqnr = min(blocks_grouped.keys())
+ if start_delay <= 1:
+ while curr_seqnr in ignore_groups:
+ rx_eti_stream.write(decodedeti[curr_seqnr])
+ del decodedeti[curr_seqnr]
+ curr_seqnr += 1
+ else:
+ if len(blocks_grouped.keys()) > MAX_WAIT_GROUPS:
+ import ipdb; ipdb.set_trace()
+ log("Failed to receive group {0}".format(curr_seqnr))
+ if curr_seqnr in decodedeti:
+ del decodedeti[curr_seqnr]
+ ignore_groups.append(curr_seqnr)
+ curr_seqnr += 1
+
+ rx_eti_stream.flush()
+
+try:
+ sock = socket(AF_INET, SOCK_DGRAM)
+ sock.bind(("", PORT))
+ rx_eti_stream = open("foo", "w") # sys.stdout
+ recieve_eti(rx_eti_stream)
+except KeyboardInterrupt:
+ sock.close()
+
diff --git a/eti-udp/eti-udp-sender.py b/eti-udp/eti-udp-sender.py
new file mode 100755
index 0000000..3eede9d
--- /dev/null
+++ b/eti-udp/eti-udp-sender.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python
+#
+# Read ETI data from standard input, in RAW, STREAMED or FRAMED format, and transmit
+# it over UDP to several receivers, using forward erasure correction and sequences
+# numbering to reorder packets.
+#
+# Copyright (c) 2012, Matthias P. Braendli <matthias@mpb.li>
+#
+# Permission to use, copy, modify, and/or distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
+# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
+# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+# PERFORMANCE OF THIS SOFTWARE.
+
+
+from etireader import *
+from etifec import *
+from socket import *
+import sys
+import os
+import time
+import zfec
+import struct
+import StringIO
+
+PORT = 12525
+
+udpdestinations = [('localhost', PORT)]
+
+sock = socket(AF_INET, SOCK_DGRAM)
+
+reader = EtiReader("../eti/streamed.eti")
+
+seqnr = 0
+
+k = 40
+m = 60
+etifec = ETI_Fec(k, m)
+
+while True:
+ seqnr += 1
+ # Read four ETI frames
+ try:
+ etigroup = "".join([reader.next() for i in range(4)])
+ except EtiReaderException as e:
+ print("End of file reached")
+ break
+
+ time.sleep(0.002)
+
+ #print("Seqnr {0}".format(seqnr))
+ tx_packets = etifec.encode_eti_group(etigroup, seqnr)
+
+ for packet in tx_packets:
+ for dest in udpdestinations:
+ sock.sendto(packet, dest)
+
diff --git a/eti-udp/etifec.py b/eti-udp/etifec.py
new file mode 100644
index 0000000..c15b46d
--- /dev/null
+++ b/eti-udp/etifec.py
@@ -0,0 +1,183 @@
+#!/usr/bin/env python
+#
+# Read ETI data from standard input, in RAW, STREAMED or FRAMED format, and
+# apply FEC
+#
+# Copyright (c) 2012, Matthias P. Braendli <matthias@mpb.li>
+#
+# Permission to use, copy, modify, and/or distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
+# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
+# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+# PERFORMANCE OF THIS SOFTWARE.
+
+
+from socket import *
+import sys
+import os
+import time
+import zfec
+import struct
+
+MAX_WAIT_GROUPS = 8 # The number of complete groups to receive before we stop waiting
+ # for packets of the current group, and declare it lost.
+
+class ETI_Fec(object):
+ def __init__(self, k, m):
+ self.k = k
+ self.m = m
+ self.z_encoder = zfec.Encoder(k=k, m=m)
+ self.z_decoder = zfec.Decoder(k=k, m=m)
+
+ def encode_eti_group(self, etidata, SEQ):
+ # prepend the data length (as an unsigned int)
+ # and add padding s.t. the total length is multiple of k
+ framelen = len(etidata)
+
+ paddinglen = 40 - ((framelen+4) % 40)
+
+ frame_uncorrected = struct.pack("I", framelen) + etidata + (" "*paddinglen)
+
+ # split into k blocks
+ bs = len(frame_uncorrected)/self.k
+ blocks_uncorrected = [frame_uncorrected[i*bs:(i+1)*bs] for i in range(self.k)]
+
+ # Apply FEC using ZFEC
+ blocks_encoded = self.z_encoder.encode(blocks_uncorrected)
+
+ # Prepend sequence number (unsigned long) and block identificator for each block (unsigned int)
+ return [struct.pack("QI", SEQ, i) + block for i,block in enumerate(blocks_encoded)]
+
+ def decode_eti_group(self, group_sn, blocks_rx, block_nr):
+
+ #print("Decoding group {0}, using {1} blocks".format(group_sn, len(blocks_rx)))
+
+ if len(blocks_rx) < self.k:
+ raise Exception("Not enough packets for frame")
+
+ rx_blocks_encoded = blocks_rx[:self.k]
+
+ # Use ZFEC to get the original blocks
+ rx_blocks_corrected = self.z_decoder.decode(rx_blocks_encoded, block_nr)
+
+ # Concatenate together, get length, remove padding
+ rx_frame_padded = "".join(rx_blocks_corrected)
+
+ rx_framelen = struct.unpack("I", rx_frame_padded[:4])[0]
+
+ return rx_frame_padded[4:4+rx_framelen]
+
+if __name__ == "__main__":
+ from etireader import *
+ import StringIO
+
+ def read_eti_group(reader):
+ # read 4 eti frames
+ return "".join([reader.next() for i in range(4)])
+
+ reader = EtiReader("buddard.eti")
+
+ # k: the number of packets required for reconstruction
+ # m: the number of packets generated
+ k = 40
+ m = 60
+
+ etifec = ETI_Fec(k, m)
+
+ #### ENCODE
+
+ NPACK = 20
+ tx = []
+ eti_tx = ""
+ for s in range(NPACK):
+ sn = s + 43
+ etigroup = read_eti_group(reader)
+ tx += etifec.encode_eti_group(etigroup, sn)
+ eti_tx += etigroup
+ print("TX group {0} of len {1}".format(sn, len(etigroup)))
+
+
+ ###### TRANSMIT
+ # The network connection drops some frames, and might reorder them
+
+ import random
+
+ packets_rx = []
+
+ MAX_DIST = 25
+ for packet in tx:
+ if random.random() < 0.75:
+ packets_rx.append(packet)
+
+ if len(packets_rx) > MAX_DIST:
+ #do some swapping
+ if random.random() < 0.75:
+ p = random.randint(2, MAX_DIST)
+ packets_rx[-1], packets_rx[-p] = packets_rx[-p], packets_rx[-1]
+
+
+ ###### DECODE
+
+
+ decodedeti = {}
+
+ blocks_grouped = {}
+ ignore_groups = []
+
+ start_delay = 10
+ curr_seqnr = -1
+
+ rx_eti_stream = StringIO.StringIO()
+ for packet in packets_rx:
+ # Add the received packet into the right place in the dictionary
+ sequence_nr, block_nr = struct.unpack("QI", packet[:12])
+ if sequence_nr in ignore_groups:
+ # We've already decoded this group. Drop.
+ continue
+ print("Registering packet sn {0} / bn {1}".format(sequence_nr, block_nr))
+ if sequence_nr not in blocks_grouped:
+ blocks_grouped[sequence_nr] = [(block_nr, packet[12:])]
+ else:
+ blocks_grouped[sequence_nr].append((block_nr, packet[12:]))
+
+ # If we have enough blocks for one group (with same sequence number), decode
+ # and add to completed list
+ for group_sn in blocks_grouped.keys()[:]:
+ if len(blocks_grouped[group_sn]) >= k:
+ block_numbers, blocks = zip(*blocks_grouped[group_sn])
+ decodedeti[group_sn] = etifec.decode_eti_group(group_sn, blocks, block_numbers)
+ del blocks_grouped[group_sn]
+ ignore_groups.append(group_sn)
+
+ # append to received stream in-order, handle incomplete groups
+ if start_delay > 0:
+ start_delay -= 1
+ if start_delay == 1:
+ curr_seqnr = min(blocks_grouped.keys())
+ if start_delay <= 1:
+ while curr_seqnr in ignore_groups:
+ rx_eti_stream.write(decodedeti[curr_seqnr])
+ del decodedeti[curr_seqnr]
+ curr_seqnr += 1
+ else:
+ if len(blocks_grouped.keys()) > MAX_WAIT_GROUPS:
+ if curr_seqnr in decodedeti:
+ del decodedeti[curr_seqnr]
+ ignore_groups.append(curr_seqnr)
+ curr_seqnr += 1
+
+ rx_eti_stream.seek(0)
+
+
+ print("Decoded frames: [{0}], Incomplete [{1}]".format(",".join(str(i) for i in ignore_groups), ",".join(str(i) for i in blocks_grouped.keys())))
+
+ if rx_eti_stream.read() == eti_tx:
+ print("ALL OK")
+ else:
+ print("FAIL")
diff --git a/eti-udp/etireader.py b/eti-udp/etireader.py
new file mode 100755
index 0000000..9b91ca7
--- /dev/null
+++ b/eti-udp/etireader.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python
+
+import struct
+import os
+
+ETI_FORMAT_RAW = "RAW"
+ETI_FORMAT_STREAMED = "STREAMED"
+ETI_FORMAT_FRAMED = "FRAMED"
+
+class EtiReaderException(Exception):
+ pass
+
+class EtiReader(object):
+ def __init__(self, filename):
+ self.filename = filename
+ self.fd = open(filename, "rb")
+ self.fmt = self.discover_filetype()
+ print("EtiReader reading {0}, discovered type {1}".format(filename, self.fmt))
+
+ def discover_filetype(self):
+ self.fd.seek(0)
+ sync = False
+ i = 0
+ while True:
+ sync = self.check_sync()
+ if not sync:
+ i = i + 1
+ self.fd.seek(i)
+ else:
+ break
+
+ if i == 0:
+ self.fd.seek(0)
+ return ETI_FORMAT_RAW
+ elif i == 2:
+ self.fd.seek(0)
+ return ETI_FORMAT_STREAMED
+ elif i == 6:
+ self.fd.seek(4)
+ return ETI_FORMAT_FRAMED
+ else:
+ print("ETI File not aligned, supposing RAW!")
+ return ETI_FORMAT_RAW
+
+
+ def __iter__(self):
+ while True:
+ n = self.next()
+ if n == "":
+ break
+ else:
+ yield n
+
+ def next(self):
+ if self.fmt == ETI_FORMAT_RAW:
+ etiframe = self.fd.read(6144)
+ if etiframe == "":
+ raise EtiReaderException("Unable to read frame")
+ return etiframe
+
+
+ elif self.fmt == ETI_FORMAT_FRAMED or self.fmt == ETI_FORMAT_STREAMED:
+
+ framesize_pack = self.fd.read(2)
+ if len(framesize_pack) < 2:
+ raise EtiReaderException("Unable to read frame size")
+ framesize = struct.unpack("H", framesize_pack)[0]
+ if framesize == 0 or framesize > 6144:
+ raise EtiReaderException("Framesize: {0}".format(framesize))
+
+ if not self.check_sync():
+ raise EtiReaderException("Unable to read sync")
+
+ self.fd.seek(-2, os.SEEK_CUR)
+ frame = self.fd.read(framesize+2)
+ if len(frame) < framesize:
+ raise EtiReaderException("Unable to read frame")
+ return frame
+
+ def check_sync(self):
+ here = self.fd.tell()
+ sync_pack = self.fd.read(4)
+ self.fd.seek(here)
+
+ sync = struct.unpack("I", sync_pack)[0]
+
+ return sync == 0x49c5f8ff or sync == 0xb63a07ff
+
+
+if __name__ == "__main__":
+ def etireadertest(reader):
+ i = 0
+ for frame in reader:
+ allframes.append(frame)
+ print("Frame {0}, length {1}".format(
+ i, len(frame)))
+ i += 1
+ if i > 10:
+ break
+
+ allframes = []
+ etireadertest(EtiReader("buddard.eti"))
+
+ allframes = []
+ etireadertest(EtiReader("streamed.eti"))
+
+ allframes = []
+ etireadertest(EtiReader("funk.eti"))
+
+ allframes = []
+ etireadertest(EtiReader("funk.raw.eti"))
diff --git a/eti-udp/ipdb.py b/eti-udp/ipdb.py
new file mode 100644
index 0000000..b431f80
--- /dev/null
+++ b/eti-udp/ipdb.py
@@ -0,0 +1,11 @@
+import sys
+from IPython.Debugger import Pdb
+from IPython.Shell import IPShell
+from IPython import ipapi
+
+shell = IPShell(argv=[''])
+
+def set_trace():
+ ip = ipapi.get()
+ def_colors = ip.options.colors
+ Pdb(def_colors).set_trace(sys._getframe().f_back)