diff options
Diffstat (limited to 'eti-udp/etifec.py')
-rw-r--r-- | eti-udp/etifec.py | 183 |
1 files changed, 183 insertions, 0 deletions
diff --git a/eti-udp/etifec.py b/eti-udp/etifec.py new file mode 100644 index 0000000..c15b46d --- /dev/null +++ b/eti-udp/etifec.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python +# +# Read ETI data from standard input, in RAW, STREAMED or FRAMED format, and +# apply FEC +# +# Copyright (c) 2012, Matthias P. Braendli <matthias@mpb.li> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +# PERFORMANCE OF THIS SOFTWARE. + + +from socket import * +import sys +import os +import time +import zfec +import struct + +MAX_WAIT_GROUPS = 8 # The number of complete groups to receive before we stop waiting + # for packets of the current group, and declare it lost. + +class ETI_Fec(object): + def __init__(self, k, m): + self.k = k + self.m = m + self.z_encoder = zfec.Encoder(k=k, m=m) + self.z_decoder = zfec.Decoder(k=k, m=m) + + def encode_eti_group(self, etidata, SEQ): + # prepend the data length (as an unsigned int) + # and add padding s.t. the total length is multiple of k + framelen = len(etidata) + + paddinglen = 40 - ((framelen+4) % 40) + + frame_uncorrected = struct.pack("I", framelen) + etidata + (" "*paddinglen) + + # split into k blocks + bs = len(frame_uncorrected)/self.k + blocks_uncorrected = [frame_uncorrected[i*bs:(i+1)*bs] for i in range(self.k)] + + # Apply FEC using ZFEC + blocks_encoded = self.z_encoder.encode(blocks_uncorrected) + + # Prepend sequence number (unsigned long) and block identificator for each block (unsigned int) + return [struct.pack("QI", SEQ, i) + block for i,block in enumerate(blocks_encoded)] + + def decode_eti_group(self, group_sn, blocks_rx, block_nr): + + #print("Decoding group {0}, using {1} blocks".format(group_sn, len(blocks_rx))) + + if len(blocks_rx) < self.k: + raise Exception("Not enough packets for frame") + + rx_blocks_encoded = blocks_rx[:self.k] + + # Use ZFEC to get the original blocks + rx_blocks_corrected = self.z_decoder.decode(rx_blocks_encoded, block_nr) + + # Concatenate together, get length, remove padding + rx_frame_padded = "".join(rx_blocks_corrected) + + rx_framelen = struct.unpack("I", rx_frame_padded[:4])[0] + + return rx_frame_padded[4:4+rx_framelen] + +if __name__ == "__main__": + from etireader import * + import StringIO + + def read_eti_group(reader): + # read 4 eti frames + return "".join([reader.next() for i in range(4)]) + + reader = EtiReader("buddard.eti") + + # k: the number of packets required for reconstruction + # m: the number of packets generated + k = 40 + m = 60 + + etifec = ETI_Fec(k, m) + + #### ENCODE + + NPACK = 20 + tx = [] + eti_tx = "" + for s in range(NPACK): + sn = s + 43 + etigroup = read_eti_group(reader) + tx += etifec.encode_eti_group(etigroup, sn) + eti_tx += etigroup + print("TX group {0} of len {1}".format(sn, len(etigroup))) + + + ###### TRANSMIT + # The network connection drops some frames, and might reorder them + + import random + + packets_rx = [] + + MAX_DIST = 25 + for packet in tx: + if random.random() < 0.75: + packets_rx.append(packet) + + if len(packets_rx) > MAX_DIST: + #do some swapping + if random.random() < 0.75: + p = random.randint(2, MAX_DIST) + packets_rx[-1], packets_rx[-p] = packets_rx[-p], packets_rx[-1] + + + ###### DECODE + + + decodedeti = {} + + blocks_grouped = {} + ignore_groups = [] + + start_delay = 10 + curr_seqnr = -1 + + rx_eti_stream = StringIO.StringIO() + for packet in packets_rx: + # Add the received packet into the right place in the dictionary + sequence_nr, block_nr = struct.unpack("QI", packet[:12]) + if sequence_nr in ignore_groups: + # We've already decoded this group. Drop. + continue + print("Registering packet sn {0} / bn {1}".format(sequence_nr, block_nr)) + if sequence_nr not in blocks_grouped: + blocks_grouped[sequence_nr] = [(block_nr, packet[12:])] + else: + blocks_grouped[sequence_nr].append((block_nr, packet[12:])) + + # If we have enough blocks for one group (with same sequence number), decode + # and add to completed list + for group_sn in blocks_grouped.keys()[:]: + if len(blocks_grouped[group_sn]) >= k: + block_numbers, blocks = zip(*blocks_grouped[group_sn]) + decodedeti[group_sn] = etifec.decode_eti_group(group_sn, blocks, block_numbers) + del blocks_grouped[group_sn] + ignore_groups.append(group_sn) + + # append to received stream in-order, handle incomplete groups + if start_delay > 0: + start_delay -= 1 + if start_delay == 1: + curr_seqnr = min(blocks_grouped.keys()) + if start_delay <= 1: + while curr_seqnr in ignore_groups: + rx_eti_stream.write(decodedeti[curr_seqnr]) + del decodedeti[curr_seqnr] + curr_seqnr += 1 + else: + if len(blocks_grouped.keys()) > MAX_WAIT_GROUPS: + if curr_seqnr in decodedeti: + del decodedeti[curr_seqnr] + ignore_groups.append(curr_seqnr) + curr_seqnr += 1 + + rx_eti_stream.seek(0) + + + print("Decoded frames: [{0}], Incomplete [{1}]".format(",".join(str(i) for i in ignore_groups), ",".join(str(i) for i in blocks_grouped.keys()))) + + if rx_eti_stream.read() == eti_tx: + print("ALL OK") + else: + print("FAIL") |