diff options
Diffstat (limited to 'self_test.py')
-rw-r--r-- | self_test.py | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/self_test.py b/self_test.py new file mode 100644 index 0000000..e18a933 --- /dev/null +++ b/self_test.py @@ -0,0 +1,81 @@ +import SoapySDR +from SoapySDR import * #SOAPY_SDR_* constants +import numpy as np +import time + +if __name__ == "__main__": + dummy = SoapySDR.Device(dict(driver="dummy")) + print(dummy) + + dummy.setSampleRate(SOAPY_SDR_RX, 0, 8e6) + dummy.setSampleRate(SOAPY_SDR_TX, 0, 8e6) + + """ + for i in range(5): + print(" Make rx stream #%d"%i) + rxStream = dummy.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32, [0]) + for j in range(5): + numSampsTotal = 10000 + print(" Activate, get %d samples, Deactivate #%d"%(numSampsTotal, j)) + dummy.activateStream(rxStream) + buff = np.array([0]*1024, np.complex64) + while numSampsTotal > 0: + sr = dummy.readStream(rxStream, [buff], buff.size, timeoutUs=int(1e6)) + #print sr + assert(sr.ret > 0) + numSampsTotal -= sr.ret + dummy.deactivateStream(rxStream) + dummy.closeStream(rxStream) + + for i in range(5): + print(" Make tx stream #%d"%i) + txStream = dummy.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [0]) + for j in range(5): + numSampsTotal = 10000 + print(" Activate, send %d samples, Deactivate #%d"%(numSampsTotal, j)) + dummy.activateStream(txStream) + buff = np.array([0]*1024, np.complex64) + while numSampsTotal != 0: + size = min(buff.size, numSampsTotal) + sr = dummy.writeStream(txStream, [buff], size) + #print sr + if not (sr.ret > 0): print("Fail %s, %d"%(sr, numSampsTotal)) + assert(sr.ret > 0) + numSampsTotal -= sr.ret + dummy.deactivateStream(txStream) + dummy.closeStream(txStream) + """ + + #################################################################### + #setup both streams at once + #################################################################### + rxStream = dummy.setupStream(SOAPY_SDR_RX, SOAPY_SDR_CF32, [0]) + txStream = dummy.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [0]) + + dummy.activateStream(rxStream) + dummy.activateStream(txStream) + + numSampsTotal = 10000 + dummy.activateStream(rxStream) + buff = np.array([0]*1024, np.complex64) + while numSampsTotal > 0: + sr = dummy.readStream(rxStream, [buff], buff.size, timeoutUs=int(1e6)) + #print(sr) + assert(sr.ret > 0) + numSampsTotal -= sr.ret + + numSampsTotal = 10000 + buff = np.array([0]*1024, np.complex64) + while numSampsTotal != 0: + size = min(buff.size, numSampsTotal) + sr = dummy.writeStream(txStream, [buff], size) + #print(sr) + if not (sr.ret > 0): print("Fail %s, %d"%(sr, numSampsTotal)) + assert(sr.ret > 0) + numSampsTotal -= sr.ret + + dummy.deactivateStream(rxStream) + dummy.deactivateStream(txStream) + + dummy.closeStream(rxStream) + dummy.closeStream(txStream) |