diff options
-rw-r--r-- | eti-udp/README | 7 | ||||
-rwxr-xr-x | eti-udp/eti-udp-receiver.py | 102 | ||||
-rwxr-xr-x | eti-udp/eti-udp-sender.py | 63 | ||||
-rw-r--r-- | eti-udp/etifec.py | 183 | ||||
-rwxr-xr-x | eti-udp/etireader.py | 111 | ||||
-rw-r--r-- | eti-udp/ipdb.py | 11 |
6 files changed, 477 insertions, 0 deletions
diff --git a/eti-udp/README b/eti-udp/README new file mode 100644 index 0000000..cffe8bc --- /dev/null +++ b/eti-udp/README @@ -0,0 +1,7 @@ +This is an experimental UDP transport protocol +for ETI with forward erasure correction for +packet loss compensation. + +Requires: +- a recent python +- zfec diff --git a/eti-udp/eti-udp-receiver.py b/eti-udp/eti-udp-receiver.py new file mode 100755 index 0000000..aacaac9 --- /dev/null +++ b/eti-udp/eti-udp-receiver.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# +# Read ETI data from standard input, in RAW, STREAMED or FRAMED format, and transmit +# it over UDP to several receivers, using forward erasure correction and sequences +# numbering to reorder packets. +# +# Copyright (c) 2012, Matthias P. Braendli <matthias@mpb.li> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +# PERFORMANCE OF THIS SOFTWARE. + +from etifec import * +from socket import * +import sys +import os +import time +import zfec +import struct +import StringIO + +PORT = 12525 + +def log(message): + sys.stderr.write(message) + sys.stderr.write("\n") + +def recieve_eti(output_fd): + k = 40 + m = 60 + etifec = ETI_Fec(k, m) + + decodedeti = {} + + blocks_grouped = {} + ignore_groups = [] + + start_delay = 10 + curr_seqnr = -1 + + + while True: + packet = sock.recv(4096) + + # Add the received packet into the right place in the dictionary + sequence_nr, block_nr = struct.unpack("QI", packet[:12]) + if sequence_nr in ignore_groups: + # We've already decoded this group. Drop. + #log("Dropped packet from already decoded group {0}".format(sequence_nr)) + continue + #log("Registering packet sn {0} / bn {1}".format(sequence_nr, block_nr)) + if sequence_nr not in blocks_grouped: + blocks_grouped[sequence_nr] = [(block_nr, packet[12:])] + else: + blocks_grouped[sequence_nr].append((block_nr, packet[12:])) + + # If we have enough blocks for one group (with same sequence number), decode + # and add to completed list + for group_sn in blocks_grouped.keys()[:]: + if len(blocks_grouped[group_sn]) >= k: + block_numbers, blocks = zip(*blocks_grouped[group_sn]) + decodedeti[group_sn] = etifec.decode_eti_group(group_sn, blocks, block_numbers) + del blocks_grouped[group_sn] + ignore_groups.append(group_sn) + + # append to received stream in-order, handle incomplete groups + if start_delay > 0: + start_delay -= 1 + if start_delay == 1: + curr_seqnr = min(blocks_grouped.keys()) + if start_delay <= 1: + while curr_seqnr in ignore_groups: + rx_eti_stream.write(decodedeti[curr_seqnr]) + del decodedeti[curr_seqnr] + curr_seqnr += 1 + else: + if len(blocks_grouped.keys()) > MAX_WAIT_GROUPS: + import ipdb; ipdb.set_trace() + log("Failed to receive group {0}".format(curr_seqnr)) + if curr_seqnr in decodedeti: + del decodedeti[curr_seqnr] + ignore_groups.append(curr_seqnr) + curr_seqnr += 1 + + rx_eti_stream.flush() + +try: + sock = socket(AF_INET, SOCK_DGRAM) + sock.bind(("", PORT)) + rx_eti_stream = open("foo", "w") # sys.stdout + recieve_eti(rx_eti_stream) +except KeyboardInterrupt: + sock.close() + diff --git a/eti-udp/eti-udp-sender.py b/eti-udp/eti-udp-sender.py new file mode 100755 index 0000000..3eede9d --- /dev/null +++ b/eti-udp/eti-udp-sender.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# +# Read ETI data from standard input, in RAW, STREAMED or FRAMED format, and transmit +# it over UDP to several receivers, using forward erasure correction and sequences +# numbering to reorder packets. +# +# Copyright (c) 2012, Matthias P. Braendli <matthias@mpb.li> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +# PERFORMANCE OF THIS SOFTWARE. + + +from etireader import * +from etifec import * +from socket import * +import sys +import os +import time +import zfec +import struct +import StringIO + +PORT = 12525 + +udpdestinations = [('localhost', PORT)] + +sock = socket(AF_INET, SOCK_DGRAM) + +reader = EtiReader("../eti/streamed.eti") + +seqnr = 0 + +k = 40 +m = 60 +etifec = ETI_Fec(k, m) + +while True: + seqnr += 1 + # Read four ETI frames + try: + etigroup = "".join([reader.next() for i in range(4)]) + except EtiReaderException as e: + print("End of file reached") + break + + time.sleep(0.002) + + #print("Seqnr {0}".format(seqnr)) + tx_packets = etifec.encode_eti_group(etigroup, seqnr) + + for packet in tx_packets: + for dest in udpdestinations: + sock.sendto(packet, dest) + diff --git a/eti-udp/etifec.py b/eti-udp/etifec.py new file mode 100644 index 0000000..c15b46d --- /dev/null +++ b/eti-udp/etifec.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python +# +# Read ETI data from standard input, in RAW, STREAMED or FRAMED format, and +# apply FEC +# +# Copyright (c) 2012, Matthias P. Braendli <matthias@mpb.li> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +# PERFORMANCE OF THIS SOFTWARE. + + +from socket import * +import sys +import os +import time +import zfec +import struct + +MAX_WAIT_GROUPS = 8 # The number of complete groups to receive before we stop waiting + # for packets of the current group, and declare it lost. + +class ETI_Fec(object): + def __init__(self, k, m): + self.k = k + self.m = m + self.z_encoder = zfec.Encoder(k=k, m=m) + self.z_decoder = zfec.Decoder(k=k, m=m) + + def encode_eti_group(self, etidata, SEQ): + # prepend the data length (as an unsigned int) + # and add padding s.t. the total length is multiple of k + framelen = len(etidata) + + paddinglen = 40 - ((framelen+4) % 40) + + frame_uncorrected = struct.pack("I", framelen) + etidata + (" "*paddinglen) + + # split into k blocks + bs = len(frame_uncorrected)/self.k + blocks_uncorrected = [frame_uncorrected[i*bs:(i+1)*bs] for i in range(self.k)] + + # Apply FEC using ZFEC + blocks_encoded = self.z_encoder.encode(blocks_uncorrected) + + # Prepend sequence number (unsigned long) and block identificator for each block (unsigned int) + return [struct.pack("QI", SEQ, i) + block for i,block in enumerate(blocks_encoded)] + + def decode_eti_group(self, group_sn, blocks_rx, block_nr): + + #print("Decoding group {0}, using {1} blocks".format(group_sn, len(blocks_rx))) + + if len(blocks_rx) < self.k: + raise Exception("Not enough packets for frame") + + rx_blocks_encoded = blocks_rx[:self.k] + + # Use ZFEC to get the original blocks + rx_blocks_corrected = self.z_decoder.decode(rx_blocks_encoded, block_nr) + + # Concatenate together, get length, remove padding + rx_frame_padded = "".join(rx_blocks_corrected) + + rx_framelen = struct.unpack("I", rx_frame_padded[:4])[0] + + return rx_frame_padded[4:4+rx_framelen] + +if __name__ == "__main__": + from etireader import * + import StringIO + + def read_eti_group(reader): + # read 4 eti frames + return "".join([reader.next() for i in range(4)]) + + reader = EtiReader("buddard.eti") + + # k: the number of packets required for reconstruction + # m: the number of packets generated + k = 40 + m = 60 + + etifec = ETI_Fec(k, m) + + #### ENCODE + + NPACK = 20 + tx = [] + eti_tx = "" + for s in range(NPACK): + sn = s + 43 + etigroup = read_eti_group(reader) + tx += etifec.encode_eti_group(etigroup, sn) + eti_tx += etigroup + print("TX group {0} of len {1}".format(sn, len(etigroup))) + + + ###### TRANSMIT + # The network connection drops some frames, and might reorder them + + import random + + packets_rx = [] + + MAX_DIST = 25 + for packet in tx: + if random.random() < 0.75: + packets_rx.append(packet) + + if len(packets_rx) > MAX_DIST: + #do some swapping + if random.random() < 0.75: + p = random.randint(2, MAX_DIST) + packets_rx[-1], packets_rx[-p] = packets_rx[-p], packets_rx[-1] + + + ###### DECODE + + + decodedeti = {} + + blocks_grouped = {} + ignore_groups = [] + + start_delay = 10 + curr_seqnr = -1 + + rx_eti_stream = StringIO.StringIO() + for packet in packets_rx: + # Add the received packet into the right place in the dictionary + sequence_nr, block_nr = struct.unpack("QI", packet[:12]) + if sequence_nr in ignore_groups: + # We've already decoded this group. Drop. + continue + print("Registering packet sn {0} / bn {1}".format(sequence_nr, block_nr)) + if sequence_nr not in blocks_grouped: + blocks_grouped[sequence_nr] = [(block_nr, packet[12:])] + else: + blocks_grouped[sequence_nr].append((block_nr, packet[12:])) + + # If we have enough blocks for one group (with same sequence number), decode + # and add to completed list + for group_sn in blocks_grouped.keys()[:]: + if len(blocks_grouped[group_sn]) >= k: + block_numbers, blocks = zip(*blocks_grouped[group_sn]) + decodedeti[group_sn] = etifec.decode_eti_group(group_sn, blocks, block_numbers) + del blocks_grouped[group_sn] + ignore_groups.append(group_sn) + + # append to received stream in-order, handle incomplete groups + if start_delay > 0: + start_delay -= 1 + if start_delay == 1: + curr_seqnr = min(blocks_grouped.keys()) + if start_delay <= 1: + while curr_seqnr in ignore_groups: + rx_eti_stream.write(decodedeti[curr_seqnr]) + del decodedeti[curr_seqnr] + curr_seqnr += 1 + else: + if len(blocks_grouped.keys()) > MAX_WAIT_GROUPS: + if curr_seqnr in decodedeti: + del decodedeti[curr_seqnr] + ignore_groups.append(curr_seqnr) + curr_seqnr += 1 + + rx_eti_stream.seek(0) + + + print("Decoded frames: [{0}], Incomplete [{1}]".format(",".join(str(i) for i in ignore_groups), ",".join(str(i) for i in blocks_grouped.keys()))) + + if rx_eti_stream.read() == eti_tx: + print("ALL OK") + else: + print("FAIL") diff --git a/eti-udp/etireader.py b/eti-udp/etireader.py new file mode 100755 index 0000000..9b91ca7 --- /dev/null +++ b/eti-udp/etireader.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python + +import struct +import os + +ETI_FORMAT_RAW = "RAW" +ETI_FORMAT_STREAMED = "STREAMED" +ETI_FORMAT_FRAMED = "FRAMED" + +class EtiReaderException(Exception): + pass + +class EtiReader(object): + def __init__(self, filename): + self.filename = filename + self.fd = open(filename, "rb") + self.fmt = self.discover_filetype() + print("EtiReader reading {0}, discovered type {1}".format(filename, self.fmt)) + + def discover_filetype(self): + self.fd.seek(0) + sync = False + i = 0 + while True: + sync = self.check_sync() + if not sync: + i = i + 1 + self.fd.seek(i) + else: + break + + if i == 0: + self.fd.seek(0) + return ETI_FORMAT_RAW + elif i == 2: + self.fd.seek(0) + return ETI_FORMAT_STREAMED + elif i == 6: + self.fd.seek(4) + return ETI_FORMAT_FRAMED + else: + print("ETI File not aligned, supposing RAW!") + return ETI_FORMAT_RAW + + + def __iter__(self): + while True: + n = self.next() + if n == "": + break + else: + yield n + + def next(self): + if self.fmt == ETI_FORMAT_RAW: + etiframe = self.fd.read(6144) + if etiframe == "": + raise EtiReaderException("Unable to read frame") + return etiframe + + + elif self.fmt == ETI_FORMAT_FRAMED or self.fmt == ETI_FORMAT_STREAMED: + + framesize_pack = self.fd.read(2) + if len(framesize_pack) < 2: + raise EtiReaderException("Unable to read frame size") + framesize = struct.unpack("H", framesize_pack)[0] + if framesize == 0 or framesize > 6144: + raise EtiReaderException("Framesize: {0}".format(framesize)) + + if not self.check_sync(): + raise EtiReaderException("Unable to read sync") + + self.fd.seek(-2, os.SEEK_CUR) + frame = self.fd.read(framesize+2) + if len(frame) < framesize: + raise EtiReaderException("Unable to read frame") + return frame + + def check_sync(self): + here = self.fd.tell() + sync_pack = self.fd.read(4) + self.fd.seek(here) + + sync = struct.unpack("I", sync_pack)[0] + + return sync == 0x49c5f8ff or sync == 0xb63a07ff + + +if __name__ == "__main__": + def etireadertest(reader): + i = 0 + for frame in reader: + allframes.append(frame) + print("Frame {0}, length {1}".format( + i, len(frame))) + i += 1 + if i > 10: + break + + allframes = [] + etireadertest(EtiReader("buddard.eti")) + + allframes = [] + etireadertest(EtiReader("streamed.eti")) + + allframes = [] + etireadertest(EtiReader("funk.eti")) + + allframes = [] + etireadertest(EtiReader("funk.raw.eti")) diff --git a/eti-udp/ipdb.py b/eti-udp/ipdb.py new file mode 100644 index 0000000..b431f80 --- /dev/null +++ b/eti-udp/ipdb.py @@ -0,0 +1,11 @@ +import sys +from IPython.Debugger import Pdb +from IPython.Shell import IPShell +from IPython import ipapi + +shell = IPShell(argv=['']) + +def set_trace(): + ip = ipapi.get() + def_colors = ip.options.colors + Pdb(def_colors).set_trace(sys._getframe().f_back) |