import SoapySDR from SoapySDR import * #SOAPY_SDR_* constants import numpy as np import time if __name__ == "__main__": dummy = SoapySDR.Device(dict(driver="dummy")) print(dummy) dummy.setSampleRate(SOAPY_SDR_RX, 0, 8e6) dummy.setSampleRate(SOAPY_SDR_TX, 0, 8e6) """ for i in range(5): print(" Make rx stream #%d"%i) rxStream = dummy.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32, [0]) for j in range(5): numSampsTotal = 10000 print(" Activate, get %d samples, Deactivate #%d"%(numSampsTotal, j)) dummy.activateStream(rxStream) buff = np.array([0]*1024, np.complex64) while numSampsTotal > 0: sr = dummy.readStream(rxStream, [buff], buff.size, timeoutUs=int(1e6)) #print sr assert(sr.ret > 0) numSampsTotal -= sr.ret dummy.deactivateStream(rxStream) dummy.closeStream(rxStream) for i in range(5): print(" Make tx stream #%d"%i) txStream = dummy.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [0]) for j in range(5): numSampsTotal = 10000 print(" Activate, send %d samples, Deactivate #%d"%(numSampsTotal, j)) dummy.activateStream(txStream) buff = np.array([0]*1024, np.complex64) while numSampsTotal != 0: size = min(buff.size, numSampsTotal) sr = dummy.writeStream(txStream, [buff], size) #print sr if not (sr.ret > 0): print("Fail %s, %d"%(sr, numSampsTotal)) assert(sr.ret > 0) numSampsTotal -= sr.ret dummy.deactivateStream(txStream) dummy.closeStream(txStream) """ #################################################################### #setup both streams at once #################################################################### rxStream = dummy.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32, [0]) txStream = dummy.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [0]) dummy.activateStream(rxStream) dummy.activateStream(txStream) numSampsTotal = 10000 dummy.activateStream(rxStream) buff = np.array([0]*1024, np.complex64) while numSampsTotal > 0: sr = dummy.readStream(rxStream, [buff], buff.size, timeoutUs=int(1e6)) #print(sr) assert(sr.ret > 0) numSampsTotal -= sr.ret numSampsTotal = 10000 buff = np.array([0]*1024, np.complex64) while numSampsTotal != 0: size = min(buff.size, numSampsTotal) sr = dummy.writeStream(txStream, [buff], size) #print(sr) if not (sr.ret > 0): print("Fail %s, %d"%(sr, numSampsTotal)) assert(sr.ret > 0) numSampsTotal -= sr.ret dummy.deactivateStream(rxStream) dummy.deactivateStream(txStream) dummy.closeStream(rxStream) dummy.closeStream(txStream)