aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-07-29 20:34:07 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-07-29 20:34:07 +0200
commit52c378b59ed4920fc4805fe9154fe49bf912cc46 (patch)
tree8293098695dcf3754ef9344487693b5928f1b1f1
parent1e625548ad55d73be527c028c89ccd81163766af (diff)
downloadodr-dab-cir-52c378b59ed4920fc4805fe9154fe49bf912cc46.tar.gz
odr-dab-cir-52c378b59ed4920fc4805fe9154fe49bf912cc46.tar.bz2
odr-dab-cir-52c378b59ed4920fc4805fe9154fe49bf912cc46.zip
Switch from rtl_sdr to rtl_tcp
This permits the rtlsdr to stream permanently, apparently the start/stop cycles of rtl_sdr freak out my rtlsdr after a while
-rwxr-xr-xcir_measure.py157
-rwxr-xr-xcorrelate_with_ref.py29
2 files changed, 156 insertions, 30 deletions
diff --git a/cir_measure.py b/cir_measure.py
index 5ec3f12..8b7a3b6 100755
--- a/cir_measure.py
+++ b/cir_measure.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
#
# This is the main program that
-# - runs rtl_sdr to record files containing samples
+# - runs rtl_tcp to record files containing samples
# - runs correlate_with_ref to calculate the CIR
# - runs a webserver to present the information
#
@@ -17,43 +17,165 @@ import subprocess
import time
import datetime
import multiprocessing as mp
+import threading
+import socket
import correlate_with_ref
import shlex
import argparse
+import collections
+import numpy as np
# The record and correlate tasks run in alternance.
# Maybe later we want to run them simultaneously in a small
# pipeline.
+class RTLSDR_Receiver(threading.Thread):
+ """Connection between the rtlsdr and our script is done using a TCP socket. This
+ class handles running the rtl_tcp tool, and reads the incoming data stream into
+ a local buffer. The buffer size is capped, and works as a FIFO, because analysis
+ of the data is slower than capturing it. We therefore want to lose some data"""
+
+ def __init__(self, options):
+ threading.Thread.__init__(self)
+
+ self.freq = float(options.freq)
+ self.rate = int(options.rate)
+ self.samps = int(options.samps)
+ self.gain = float(options.gain)
+
+ # We want to keep twice the amount of samples
+ # in the queue to have some margin. Samples are
+ # two bytes because they are I/Q interleaved u8
+ self.max_num_bytes = self.samps * 2 * 2
+
+ self.event_stop = threading.Event()
+
+ self.rtl_tcp_port = 59152 # chosen randomly
+
+ self.data_queue = collections.deque()
+
+ # While the data_queue is itself thread-safe, we need to make sure
+ # the consumer cannot preeempt the little housekeeping we do in run()
+ # to keep the maximum queue length.
+ self.data_lock = threading.Lock()
+
+ self.rtlsdr_proc = None
+
+ def run(self):
+ rtl_tcp_cmdline = shlex.split("rtl_tcp -f {} -s {} -g {} -p {}".format(self.freq, self.rate, self.gain, self.rtl_tcp_port))
+ self.rtlsdr_proc = subprocess.Popen(rtl_tcp_cmdline)
+
+ time.sleep(1.5)
+
+ self.sock = socket.socket()
+ self.sock.connect(("localhost", self.rtl_tcp_port))
+
+ while not self.event_stop.is_set():
+ try:
+ samples = self.sock.recv(1024)
+
+ self.data_queue.extend(samples)
+ except:
+ print('Socket error')
+ break
+
+ self.data_lock.acquire()
+
+ # try/catch/except to make sure we release the lock, and
+ # re-raise any exception up
+ try:
+ n_bytes = len(self.data_queue)
+
+ if n_bytes > self.max_num_bytes:
+ num_to_delete = n_bytes - self.max_num_bytes
+ for i in range(num_to_delete):
+ self.data_queue.popleft()
+ except:
+ raise
+ finally:
+ self.data_lock.release()
+ print("Receiver leaving")
+
+ self.sock.close()
+
+ self.rtlsdr_proc.terminate()
+
+ self.rtlsdr_proc.wait()
+
+ print("Receiver thread ends")
+
+ def stop(self):
+ self.event_stop.set()
+ self.join()
+
+ def get_samples(self, num_samples):
+ """Return a string containing num_bytes if that is available,
+ or return None if not enough data available"""
+ ret = None
+
+ num_bytes = num_samples * 2
+
+ self.data_lock.acquire()
+
+ try:
+ n_bytes = len(self.data_queue)
+
+ if n_bytes > num_bytes:
+ ret = "".join(
+ self.data_queue.popleft()
+ for i in range(num_bytes))
+ except:
+ raise
+ finally:
+ self.data_lock.release()
+
+ return ret
+
+
class RTLSDR_CIR_Runner(mp.Process):
def __init__(self, options, iq_file, fig_file):
- """Initialise a new runner, which runs rtl_sdr
+ """Initialise a new runner, which runs rtl_tcp
that will save to iq_file, and run the CIR analysis
that will save to fig_file.
options must contain freq, rate and samps fields"""
mp.Process.__init__(self)
- self.events = mp.Queue()
-
self.freq = float(options.freq)
- self.rate = int(options.rate)
self.samps = int(options.samps)
- self.gain = float(options.gain)
+
+ self.receiver = RTLSDR_Receiver(options)
+
+ self.events = mp.Queue()
self.iq_file = iq_file
self.fig_file = fig_file
def stop(self):
self.events.put("quit")
+ self.join()
def run(self):
+
+ self.receiver.start()
+
while True:
time.sleep(1)
try:
- self.do_one_cir_run()
+ samps = self.receiver.get_samples(self.samps)
+ if samps:
+ print("Got {} samples".format(len(samps)))
+ # The RTLSDR outputs u8 format
+ iq_data = np.array( [ord(c) for c in samps], np.uint8 )
+ self.do_one_cir_run(iq_data)
+ else:
+ print("Got 0 samples")
+
except Exception as e:
print("Exception occurred: {}".format(e))
+ except KeyboardInterrupt:
+ print("Keyhoard Interrupt")
+ break
try:
ev = self.events.get_nowait()
@@ -62,25 +184,11 @@ class RTLSDR_CIR_Runner(mp.Process):
except mp.queues.Empty:
pass
- def do_one_cir_run(self):
- # Build the rtl_sdr command line from the settings in config
- rtl_sdr_cmdline = shlex.split("rtl_sdr -f {} -s {} -g {} -S -".format(self.freq, self.rate, self.gain))
- dd_cmdline = shlex.split("dd of={} bs=2 count={}".format(self.iq_file, self.samps))
-
- # To avoid calling the shell, we do the pipe between rtlsdr and dd using Popen
- rtlsdr_proc = subprocess.Popen(rtl_sdr_cmdline, stdout=subprocess.PIPE)
- dd_proc = subprocess.Popen(dd_cmdline, stdin=rtlsdr_proc.stdout)
-
- # close our connection to the pipe so that rtlsdr gets the SIGPIPE
- rtlsdr_proc.stdout.close()
-
- dd_proc.communicate()
- dd_proc.wait()
- rtlsdr_proc.wait()
+ self.receiver.stop()
- # The RTLSDR outputs u8 format
+ def do_one_cir_run(self, iq_data):
print("Starting correlation")
- cir_corr = correlate_with_ref.CIR_Correlate(self.iq_file, "u8")
+ cir_corr = correlate_with_ref.CIR_Correlate(iq_data=iq_data, iq_format="u8")
title = "Correlation on {}kHz done at {}".format(
int(self.freq / 1000),
@@ -131,4 +239,3 @@ if __name__ == '__main__':
run(host=cli_args.host, port=int(cli_args.port), debug=True, reloader=False)
finally:
rtlsdr_cir.stop()
- rtlsdr_cir.join()
diff --git a/correlate_with_ref.py b/correlate_with_ref.py
index ab55682..23c9c5c 100755
--- a/correlate_with_ref.py
+++ b/correlate_with_ref.py
@@ -33,13 +33,27 @@ T_NULL = 2656
T_TF = 196608
class CIR_Correlate:
- def __init__(self, iq_filename, iq_format):
- """Read phase reference from fixed file and load IQ data from
- iq_filename. iq_format must be fc64 or u8"""
+ def __init__(self, iq_filename="", iq_format=None, iq_data=None):
+ """Either call with iq_filename, or with iq_data containing
+ a np.array with the data.
+
+ This class will then read phase reference from fixed file and
+ load IQ data from iq_filename, or use iq_data directly.
+
+ iq_format must be fc64 or u8"""
+
+ if iq_format is None:
+ raise ValueError("Incorrect initialisation")
+
self.phase_ref = np.fromfile("phasereference.2048000.fc64.iq", np.complex64)
if iq_format == "u8":
- channel_u8_interleaved = np.fromfile(iq_filename, np.uint8)
+ if iq_filename:
+ channel_u8_interleaved = np.fromfile(iq_filename, np.uint8)
+ elif iq_data is not None:
+ channel_u8_interleaved = iq_data
+ else:
+ raise ValueError("Must give iq_filename or iq_data")
channel_u8_iq = channel_u8_interleaved.reshape(int(len(channel_u8_interleaved)/2), 2)
# This directly converts to fc64
channel_fc64_unscaled = channel_u8_iq[...,0] + np.complex64(1j) * channel_u8_iq[...,1]
@@ -47,7 +61,12 @@ class CIR_Correlate:
channel_fc64_dc_comp = channel_fc64_scaled - np.average(channel_fc64_scaled)
self.channel_out = channel_fc64_dc_comp
elif iq_format == "fc64":
- self.channel_out = np.fromfile(iq_filename, np.complex64)
+ if iq_filename:
+ self.channel_out = np.fromfile(iq_filename, np.complex64)
+ elif iq_data is not None:
+ self.channel_out = iq_data
+ else:
+ raise ValueError("Must give iq_filename or iq_data")
else:
raise ValueError("Unsupported format {}".format(iq_format))