1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
#!/usr/bin/env python
#
# Read ETI data from standard input, in RAW, STREAMED or FRAMED format, and transmit
# it over UDP to several receivers, using forward erasure correction and sequences
# numbering to reorder packets.
#
# Copyright (c) 2012, Matthias P. Braendli <matthias@mpb.li>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
from etifec import *
from socket import *
import sys
import os
import time
import zfec
import struct
import StringIO
PORT = 12525
def log(message):
sys.stderr.write(message)
sys.stderr.write("\n")
def recieve_eti(output_fd):
k = 40
m = 60
etifec = ETI_Fec(k, m)
decodedeti = {}
blocks_grouped = {}
ignore_groups = []
start_delay = 10
curr_seqnr = -1
while True:
packet = sock.recv(4096)
# Add the received packet into the right place in the dictionary
sequence_nr, block_nr = struct.unpack("QI", packet[:12])
if sequence_nr in ignore_groups:
# We've already decoded this group. Drop.
#log("Dropped packet from already decoded group {0}".format(sequence_nr))
continue
#log("Registering packet sn {0} / bn {1}".format(sequence_nr, block_nr))
if sequence_nr not in blocks_grouped:
blocks_grouped[sequence_nr] = [(block_nr, packet[12:])]
else:
blocks_grouped[sequence_nr].append((block_nr, packet[12:]))
# If we have enough blocks for one group (with same sequence number), decode
# and add to completed list
for group_sn in blocks_grouped.keys()[:]:
if len(blocks_grouped[group_sn]) >= k:
block_numbers, blocks = zip(*blocks_grouped[group_sn])
decodedeti[group_sn] = etifec.decode_eti_group(group_sn, blocks, block_numbers)
del blocks_grouped[group_sn]
ignore_groups.append(group_sn)
# append to received stream in-order, handle incomplete groups
if start_delay > 0:
start_delay -= 1
if start_delay == 1:
curr_seqnr = min(blocks_grouped.keys())
if start_delay <= 1:
while curr_seqnr in ignore_groups:
rx_eti_stream.write(decodedeti[curr_seqnr])
del decodedeti[curr_seqnr]
curr_seqnr += 1
else:
if len(blocks_grouped.keys()) > MAX_WAIT_GROUPS:
import ipdb; ipdb.set_trace()
log("Failed to receive group {0}".format(curr_seqnr))
if curr_seqnr in decodedeti:
del decodedeti[curr_seqnr]
ignore_groups.append(curr_seqnr)
curr_seqnr += 1
rx_eti_stream.flush()
try:
sock = socket(AF_INET, SOCK_DGRAM)
sock.bind(("", PORT))
rx_eti_stream = open("foo", "w") # sys.stdout
recieve_eti(rx_eti_stream)
except KeyboardInterrupt:
sock.close()
|