From 52c378b59ed4920fc4805fe9154fe49bf912cc46 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 29 Jul 2016 20:34:07 +0200 Subject: Switch from rtl_sdr to rtl_tcp This permits the rtlsdr to stream permanently, apparently the start/stop cycles of rtl_sdr freak out my rtlsdr after a while --- cir_measure.py | 157 ++++++++++++++++++++++++++++++++++++++++++-------- correlate_with_ref.py | 29 ++++++++-- 2 files changed, 156 insertions(+), 30 deletions(-) diff --git a/cir_measure.py b/cir_measure.py index 5ec3f12..8b7a3b6 100755 --- a/cir_measure.py +++ b/cir_measure.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # # This is the main program that -# - runs rtl_sdr to record files containing samples +# - runs rtl_tcp to record files containing samples # - runs correlate_with_ref to calculate the CIR # - runs a webserver to present the information # @@ -17,43 +17,165 @@ import subprocess import time import datetime import multiprocessing as mp +import threading +import socket import correlate_with_ref import shlex import argparse +import collections +import numpy as np # The record and correlate tasks run in alternance. # Maybe later we want to run them simultaneously in a small # pipeline. +class RTLSDR_Receiver(threading.Thread): + """Connection between the rtlsdr and our script is done using a TCP socket. This + class handles running the rtl_tcp tool, and reads the incoming data stream into + a local buffer. The buffer size is capped, and works as a FIFO, because analysis + of the data is slower than capturing it. We therefore want to lose some data""" + + def __init__(self, options): + threading.Thread.__init__(self) + + self.freq = float(options.freq) + self.rate = int(options.rate) + self.samps = int(options.samps) + self.gain = float(options.gain) + + # We want to keep twice the amount of samples + # in the queue to have some margin. Samples are + # two bytes because they are I/Q interleaved u8 + self.max_num_bytes = self.samps * 2 * 2 + + self.event_stop = threading.Event() + + self.rtl_tcp_port = 59152 # chosen randomly + + self.data_queue = collections.deque() + + # While the data_queue is itself thread-safe, we need to make sure + # the consumer cannot preeempt the little housekeeping we do in run() + # to keep the maximum queue length. + self.data_lock = threading.Lock() + + self.rtlsdr_proc = None + + def run(self): + rtl_tcp_cmdline = shlex.split("rtl_tcp -f {} -s {} -g {} -p {}".format(self.freq, self.rate, self.gain, self.rtl_tcp_port)) + self.rtlsdr_proc = subprocess.Popen(rtl_tcp_cmdline) + + time.sleep(1.5) + + self.sock = socket.socket() + self.sock.connect(("localhost", self.rtl_tcp_port)) + + while not self.event_stop.is_set(): + try: + samples = self.sock.recv(1024) + + self.data_queue.extend(samples) + except: + print('Socket error') + break + + self.data_lock.acquire() + + # try/catch/except to make sure we release the lock, and + # re-raise any exception up + try: + n_bytes = len(self.data_queue) + + if n_bytes > self.max_num_bytes: + num_to_delete = n_bytes - self.max_num_bytes + for i in range(num_to_delete): + self.data_queue.popleft() + except: + raise + finally: + self.data_lock.release() + print("Receiver leaving") + + self.sock.close() + + self.rtlsdr_proc.terminate() + + self.rtlsdr_proc.wait() + + print("Receiver thread ends") + + def stop(self): + self.event_stop.set() + self.join() + + def get_samples(self, num_samples): + """Return a string containing num_bytes if that is available, + or return None if not enough data available""" + ret = None + + num_bytes = num_samples * 2 + + self.data_lock.acquire() + + try: + n_bytes = len(self.data_queue) + + if n_bytes > num_bytes: + ret = "".join( + self.data_queue.popleft() + for i in range(num_bytes)) + except: + raise + finally: + self.data_lock.release() + + return ret + + class RTLSDR_CIR_Runner(mp.Process): def __init__(self, options, iq_file, fig_file): - """Initialise a new runner, which runs rtl_sdr + """Initialise a new runner, which runs rtl_tcp that will save to iq_file, and run the CIR analysis that will save to fig_file. options must contain freq, rate and samps fields""" mp.Process.__init__(self) - self.events = mp.Queue() - self.freq = float(options.freq) - self.rate = int(options.rate) self.samps = int(options.samps) - self.gain = float(options.gain) + + self.receiver = RTLSDR_Receiver(options) + + self.events = mp.Queue() self.iq_file = iq_file self.fig_file = fig_file def stop(self): self.events.put("quit") + self.join() def run(self): + + self.receiver.start() + while True: time.sleep(1) try: - self.do_one_cir_run() + samps = self.receiver.get_samples(self.samps) + if samps: + print("Got {} samples".format(len(samps))) + # The RTLSDR outputs u8 format + iq_data = np.array( [ord(c) for c in samps], np.uint8 ) + self.do_one_cir_run(iq_data) + else: + print("Got 0 samples") + except Exception as e: print("Exception occurred: {}".format(e)) + except KeyboardInterrupt: + print("Keyhoard Interrupt") + break try: ev = self.events.get_nowait() @@ -62,25 +184,11 @@ class RTLSDR_CIR_Runner(mp.Process): except mp.queues.Empty: pass - def do_one_cir_run(self): - # Build the rtl_sdr command line from the settings in config - rtl_sdr_cmdline = shlex.split("rtl_sdr -f {} -s {} -g {} -S -".format(self.freq, self.rate, self.gain)) - dd_cmdline = shlex.split("dd of={} bs=2 count={}".format(self.iq_file, self.samps)) - - # To avoid calling the shell, we do the pipe between rtlsdr and dd using Popen - rtlsdr_proc = subprocess.Popen(rtl_sdr_cmdline, stdout=subprocess.PIPE) - dd_proc = subprocess.Popen(dd_cmdline, stdin=rtlsdr_proc.stdout) - - # close our connection to the pipe so that rtlsdr gets the SIGPIPE - rtlsdr_proc.stdout.close() - - dd_proc.communicate() - dd_proc.wait() - rtlsdr_proc.wait() + self.receiver.stop() - # The RTLSDR outputs u8 format + def do_one_cir_run(self, iq_data): print("Starting correlation") - cir_corr = correlate_with_ref.CIR_Correlate(self.iq_file, "u8") + cir_corr = correlate_with_ref.CIR_Correlate(iq_data=iq_data, iq_format="u8") title = "Correlation on {}kHz done at {}".format( int(self.freq / 1000), @@ -131,4 +239,3 @@ if __name__ == '__main__': run(host=cli_args.host, port=int(cli_args.port), debug=True, reloader=False) finally: rtlsdr_cir.stop() - rtlsdr_cir.join() diff --git a/correlate_with_ref.py b/correlate_with_ref.py index ab55682..23c9c5c 100755 --- a/correlate_with_ref.py +++ b/correlate_with_ref.py @@ -33,13 +33,27 @@ T_NULL = 2656 T_TF = 196608 class CIR_Correlate: - def __init__(self, iq_filename, iq_format): - """Read phase reference from fixed file and load IQ data from - iq_filename. iq_format must be fc64 or u8""" + def __init__(self, iq_filename="", iq_format=None, iq_data=None): + """Either call with iq_filename, or with iq_data containing + a np.array with the data. + + This class will then read phase reference from fixed file and + load IQ data from iq_filename, or use iq_data directly. + + iq_format must be fc64 or u8""" + + if iq_format is None: + raise ValueError("Incorrect initialisation") + self.phase_ref = np.fromfile("phasereference.2048000.fc64.iq", np.complex64) if iq_format == "u8": - channel_u8_interleaved = np.fromfile(iq_filename, np.uint8) + if iq_filename: + channel_u8_interleaved = np.fromfile(iq_filename, np.uint8) + elif iq_data is not None: + channel_u8_interleaved = iq_data + else: + raise ValueError("Must give iq_filename or iq_data") channel_u8_iq = channel_u8_interleaved.reshape(int(len(channel_u8_interleaved)/2), 2) # This directly converts to fc64 channel_fc64_unscaled = channel_u8_iq[...,0] + np.complex64(1j) * channel_u8_iq[...,1] @@ -47,7 +61,12 @@ class CIR_Correlate: channel_fc64_dc_comp = channel_fc64_scaled - np.average(channel_fc64_scaled) self.channel_out = channel_fc64_dc_comp elif iq_format == "fc64": - self.channel_out = np.fromfile(iq_filename, np.complex64) + if iq_filename: + self.channel_out = np.fromfile(iq_filename, np.complex64) + elif iq_data is not None: + self.channel_out = iq_data + else: + raise ValueError("Must give iq_filename or iq_data") else: raise ValueError("Unsupported format {}".format(iq_format)) -- cgit v1.2.3