diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-10-30 14:19:26 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2015-10-30 14:19:26 +0100 |
commit | 0bd1e4e64984cb51e65d6f0e8db92f44934eff59 (patch) | |
tree | 438d5937a0eba1de38cf19505257bee1e52d609b | |
parent | 3ade194073d5d20ace17986c148800bf2a3554ef (diff) | |
download | odr-dpd-0bd1e4e64984cb51e65d6f0e8db92f44934eff59.tar.gz odr-dpd-0bd1e4e64984cb51e65d6f0e8db92f44934eff59.tar.bz2 odr-dpd-0bd1e4e64984cb51e65d6f0e8db92f44934eff59.zip |
Add working ZMQ input
-rw-r--r-- | CMakeLists.txt | 3 | ||||
-rw-r--r-- | OutputUHD.cpp | 6 | ||||
-rw-r--r-- | OutputUHD.hpp | 4 | ||||
-rw-r--r-- | main.cpp | 66 |
4 files changed, 59 insertions, 20 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e1ed2d..e4a53a0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,6 +50,7 @@ add_definitions(-Wall) find_package(PkgConfig) pkg_check_modules(UHD uhd) +pkg_check_modules(ZMQ libzmq) # Threads find_package(Threads REQUIRED) @@ -63,7 +64,7 @@ list(APPEND odrdpd_sources OutputUHD.cpp ) -list(APPEND common_link_list ${UHD_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +list(APPEND common_link_list ${UHD_LIBRARIES} ${ZMQ_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) set_source_files_properties( ${odrdpd_sources} diff --git a/OutputUHD.cpp b/OutputUHD.cpp index c337262..a56112d 100644 --- a/OutputUHD.cpp +++ b/OutputUHD.cpp @@ -37,7 +37,7 @@ using namespace std; -void OutputUHD::Init(double txgain) +OutputUHD::OutputUHD(double txgain) { m_txgain = txgain; @@ -72,7 +72,7 @@ void OutputUHD::Init(double txgain) myTxStream = m_usrp->get_tx_stream(stream_args); } -size_t OutputUHD::Transmit(std::vector<complexf> samples, size_t sizeIn, double *first_sample_time) +size_t OutputUHD::Transmit(const complexf *samples, size_t sizeIn, double *first_sample_time) { const double tx_timeout = 20.0; @@ -91,7 +91,7 @@ size_t OutputUHD::Transmit(std::vector<complexf> samples, size_t sizeIn, double num_acc_samps += num_tx_samps; - md.time_spec += uhd::time_spec_t(0, num_acc_samps/m_samplerate); + md.time_spec += uhd::time_spec_t(0, num_tx_samps/m_samplerate); } return num_acc_samps; diff --git a/OutputUHD.hpp b/OutputUHD.hpp index 007e51f..ad95e99 100644 --- a/OutputUHD.hpp +++ b/OutputUHD.hpp @@ -31,9 +31,9 @@ typedef std::complex<float> complexf; class OutputUHD { public: - void Init(double txgain); + OutputUHD(double txgain); - size_t Transmit(std::vector<complexf> samples, size_t sizeIn, double *first_sample_time); + size_t Transmit(const complexf *samples, size_t sizeIn, double *first_sample_time); private: double m_samplerate; @@ -23,6 +23,7 @@ #include "OutputUHD.hpp" #include "utils.hpp" +#include <zmq.hpp> size_t read_samples(FILE* fd, std::vector<complexf>& samples, size_t count) { @@ -31,10 +32,10 @@ size_t read_samples(FILE* fd, std::vector<complexf>& samples, size_t count) samples.resize(count); } - size_t num_read = fread((char*)&samples.front(), sizeof(complexf), count, fd); + size_t num_read = fread(&samples.front(), sizeof(complexf), count, fd); if (num_read == 0) { rewind(fd); - num_read = fread((char*)&samples.front(), sizeof(complexf), count, fd); + num_read = fread(&samples.front(), sizeof(complexf), count, fd); } return num_read; @@ -44,23 +45,41 @@ int main(int argc, char **argv) { double txgain = 0; - if (argc == 1) { - txgain = strtod(argv[1], nullptr); + if (argc == 3) { + txgain = strtod(argv[2], nullptr); if (!(0 <= txgain and txgain < 80)) { MDEBUG("txgain wrong: %f\n", txgain); return -1; } } + MDEBUG("TX Gain is %f\n", txgain); + + if (argc < 2) { + MDEBUG("Require input file or url\n"); + return -1; + } + + std::string uri = argv[1]; + const size_t samps_per_buffer = 20480; - OutputUHD output_uhd; - output_uhd.Init(txgain); + OutputUHD output_uhd(txgain); - FILE* fd = fopen("input.iq", "rb"); - if (!fd) { - MDEBUG("Could not open file\n"); - return -1; + zmq::context_t ctx; + zmq::socket_t zmq_sock(ctx, ZMQ_SUB); + + FILE* fd = nullptr; + if (uri.find("tcp://") != 0) { + fd = fopen(uri.c_str(), "rb"); + if (!fd) { + MDEBUG("Could not open file\n"); + return -1; + } + } + else { + zmq_sock.connect(uri); + zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); } std::vector<complexf> input_samples(samps_per_buffer); @@ -68,12 +87,31 @@ int main(int argc, char **argv) size_t total_samps_read = samps_read; double last_print_time = 0; + size_t sent = 0; do { - samps_read = read_samples(fd, input_samples, samps_per_buffer); - double first_sample_time = 0; - output_uhd.Transmit(input_samples, samps_read, &first_sample_time); + + if (fd) { + samps_read = read_samples(fd, input_samples, samps_per_buffer); + sent = output_uhd.Transmit(&input_samples.front(), samps_read, &first_sample_time); + } + else { + zmq::message_t msg; + if (not zmq_sock.recv(&msg)) { + MDEBUG("zmq recv error\n"); + return -1; + } + + if (msg.size() % sizeof(complexf) != 0) { + MDEBUG("Received incomplete size %zu\n", msg.size()); + return -1; + } + samps_read = msg.size() / sizeof(complexf); + + sent = output_uhd.Transmit((complexf*)msg.data(), samps_read, &first_sample_time); + } + if (first_sample_time - last_print_time > 1) { MDEBUG("Tx %zu samples at t=%f\n", samps_read, first_sample_time); last_print_time = first_sample_time; @@ -81,7 +119,7 @@ int main(int argc, char **argv) total_samps_read += samps_read; } - while (samps_read); + while (samps_read and sent); MDEBUG("Read %zu samples in total\n", total_samps_read); } |