aboutsummaryrefslogtreecommitdiffstats
path: root/eti-udp/etifec.py
blob: c15b46dca8bc0be4f2146ceb992396d8d5725a25 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python
#
# Read ETI data from standard input, in RAW, STREAMED or FRAMED format, and
# apply FEC
#
# Copyright (c) 2012, Matthias P. Braendli <matthias@mpb.li>
# 
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.


from socket import *
import sys
import os
import time
import zfec
import struct

MAX_WAIT_GROUPS = 8 # The number of complete groups to receive before we stop waiting
                    # for packets of the current group, and declare it lost.

class ETI_Fec(object):
    def __init__(self, k, m):
        self.k = k
        self.m = m
        self.z_encoder = zfec.Encoder(k=k, m=m)
        self.z_decoder = zfec.Decoder(k=k, m=m)

    def encode_eti_group(self, etidata, SEQ):
        # prepend the data length (as an unsigned int)
        # and add padding s.t. the total length is multiple of k
        framelen = len(etidata)

        paddinglen = 40 - ((framelen+4) % 40)

        frame_uncorrected = struct.pack("I", framelen) + etidata + (" "*paddinglen)

        # split into k blocks
        bs = len(frame_uncorrected)/self.k
        blocks_uncorrected = [frame_uncorrected[i*bs:(i+1)*bs] for i in range(self.k)]

        # Apply FEC using ZFEC
        blocks_encoded = self.z_encoder.encode(blocks_uncorrected)

        # Prepend sequence number (unsigned long) and block identificator for each block (unsigned int)
        return [struct.pack("QI", SEQ, i) + block for i,block in enumerate(blocks_encoded)]

    def decode_eti_group(self, group_sn, blocks_rx, block_nr):

        #print("Decoding group {0}, using {1} blocks".format(group_sn, len(blocks_rx)))

        if len(blocks_rx) < self.k:
            raise Exception("Not enough packets for frame")

        rx_blocks_encoded = blocks_rx[:self.k]

        # Use ZFEC to get the original blocks
        rx_blocks_corrected = self.z_decoder.decode(rx_blocks_encoded, block_nr)

        # Concatenate together, get length, remove padding
        rx_frame_padded = "".join(rx_blocks_corrected)

        rx_framelen = struct.unpack("I", rx_frame_padded[:4])[0]

        return rx_frame_padded[4:4+rx_framelen]

if __name__ == "__main__":
    from etireader import *
    import StringIO

    def read_eti_group(reader):
        # read 4 eti frames
        return "".join([reader.next() for i in range(4)])

    reader = EtiReader("buddard.eti")

    # k: the number of packets required for reconstruction 
    # m: the number of packets generated 
    k = 40
    m = 60

    etifec = ETI_Fec(k, m)

    #### ENCODE

    NPACK = 20
    tx = []
    eti_tx = ""
    for s in range(NPACK):
        sn = s + 43
        etigroup = read_eti_group(reader)
        tx += etifec.encode_eti_group(etigroup, sn)
        eti_tx += etigroup
        print("TX group {0} of len {1}".format(sn, len(etigroup)))


    ###### TRANSMIT
    # The network connection drops some frames, and might reorder them

    import random

    packets_rx = []

    MAX_DIST = 25
    for packet in tx:
        if random.random() < 0.75:
            packets_rx.append(packet)

        if len(packets_rx) > MAX_DIST:
            #do some swapping
            if random.random() < 0.75:
                p = random.randint(2, MAX_DIST)
                packets_rx[-1], packets_rx[-p] = packets_rx[-p], packets_rx[-1]


    ###### DECODE


    decodedeti = {}

    blocks_grouped = {}
    ignore_groups = []

    start_delay = 10
    curr_seqnr = -1

    rx_eti_stream = StringIO.StringIO()
    for packet in packets_rx:
        # Add the received packet into the right place in the dictionary
        sequence_nr, block_nr = struct.unpack("QI", packet[:12])
        if sequence_nr in ignore_groups:
            # We've already decoded this group. Drop.
            continue
        print("Registering packet sn {0} / bn {1}".format(sequence_nr, block_nr))
        if sequence_nr not in blocks_grouped:
            blocks_grouped[sequence_nr] = [(block_nr, packet[12:])]
        else:
            blocks_grouped[sequence_nr].append((block_nr, packet[12:]))

        # If we have enough blocks for one group (with same sequence number), decode
        # and add to completed list
        for group_sn in blocks_grouped.keys()[:]:
            if len(blocks_grouped[group_sn]) >= k:
                block_numbers, blocks = zip(*blocks_grouped[group_sn])
                decodedeti[group_sn] = etifec.decode_eti_group(group_sn, blocks, block_numbers)
                del blocks_grouped[group_sn]
                ignore_groups.append(group_sn)

        # append to received stream in-order, handle incomplete groups
        if start_delay > 0:
            start_delay -= 1
        if start_delay == 1:
            curr_seqnr = min(blocks_grouped.keys())
        if start_delay <= 1:
            while curr_seqnr in ignore_groups:
                rx_eti_stream.write(decodedeti[curr_seqnr])
                del decodedeti[curr_seqnr]
                curr_seqnr += 1
            else:
                if len(blocks_grouped.keys()) > MAX_WAIT_GROUPS:
                    if curr_seqnr in decodedeti:
                        del decodedeti[curr_seqnr]
                    ignore_groups.append(curr_seqnr)
                    curr_seqnr += 1

    rx_eti_stream.seek(0)


    print("Decoded frames: [{0}], Incomplete [{1}]".format(",".join(str(i) for i in ignore_groups), ",".join(str(i) for i in blocks_grouped.keys())))

    if rx_eti_stream.read() == eti_tx:
        print("ALL OK")
    else:
        print("FAIL")