diff options
Diffstat (limited to 'src/OutputUHDFeedback.cpp')
-rw-r--r-- | src/OutputUHDFeedback.cpp | 120 |
1 files changed, 99 insertions, 21 deletions
diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index 73497a1..8584839 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -40,6 +40,7 @@ DESCRIPTION: #include <complex> #include <uhd/types/stream_cmd.hpp> #include <sys/socket.h> +#include <poll.h> #include "OutputUHDFeedback.h" #include "Utils.h" @@ -48,7 +49,7 @@ typedef std::complex<float> complexf; OutputUHDFeedback::OutputUHDFeedback() { - m_running = false; + m_running.store(false); } void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, uint32_t sampleRate) @@ -59,7 +60,7 @@ void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, u if (port) { m_port = port; - m_running = true; + m_running.store(true); rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this); burst_tcp_thread = boost::thread(&OutputUHDFeedback::ServeFeedbackThread, this); @@ -68,7 +69,11 @@ void OutputUHDFeedback::setup(uhd::usrp::multi_usrp::sptr usrp, uint16_t port, u OutputUHDFeedback::~OutputUHDFeedback() { - m_running = false; + m_running.store(false); + + rx_burst_thread.interrupt(); + burst_tcp_thread.interrupt(); + rx_burst_thread.join(); burst_tcp_thread.join(); } @@ -79,9 +84,13 @@ void OutputUHDFeedback::set_tx_frame( { boost::mutex::scoped_lock lock(burstRequest.mutex); + assert(buf.size() % sizeof(complexf) == 0); + if (burstRequest.state == BurstRequestState::SaveTransmitFrame) { const size_t n = std::min( - burstRequest.frame_length * sizeof(complexf), buf.size()); + burstRequest.num_samples * sizeof(complexf), buf.size()); + + burstRequest.num_samples = n / sizeof(complexf); burstRequest.tx_samples.clear(); burstRequest.tx_samples.resize(n); @@ -129,7 +138,7 @@ void OutputUHDFeedback::ReceiveBurstThread() uhd::stream_cmd_t cmd( uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); - cmd.num_samps = burstRequest.frame_length; + cmd.num_samps = burstRequest.num_samples; cmd.stream_now = false; double pps = burstRequest.rx_pps / 16384000.0; @@ -138,9 +147,10 @@ void OutputUHDFeedback::ReceiveBurstThread() rxStream->issue_stream_cmd(cmd); uhd::rx_metadata_t md; - burstRequest.rx_samples.resize(burstRequest.frame_length * sizeof(complexf)); - rxStream->recv(&burstRequest.rx_samples[0], burstRequest.frame_length, md); + burstRequest.rx_samples.resize(burstRequest.num_samples * sizeof(complexf)); + rxStream->recv(&burstRequest.rx_samples[0], burstRequest.num_samples, md); + // The recv might have happened at another time than requested burstRequest.rx_second = md.time_spec.get_full_secs(); burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; @@ -151,13 +161,32 @@ void OutputUHDFeedback::ReceiveBurstThread() } } +static int accept_with_timeout(int server_socket, int timeout_ms, struct sockaddr_in *client) +{ + struct pollfd fds[1]; + fds[0].fd = server_socket; + fds[0].events = POLLIN | POLLOUT; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1) { + throw std::runtime_error("TCP Socket accept error: " + to_string(errno)); + } + else if (retval) { + socklen_t client_len = sizeof(struct sockaddr_in); + return accept(server_socket, (struct sockaddr*)&client, &client_len); + } + else { + return -2; + } +} + void OutputUHDFeedback::ServeFeedbackThread() { set_thread_name("uhdservefeedback"); - int server_sock = -1; try { - if ((server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { throw std::runtime_error("Can't create TCP socket"); } @@ -166,31 +195,32 @@ void OutputUHDFeedback::ServeFeedbackThread() addr.sin_port = htons(m_port); addr.sin_addr.s_addr = htonl(INADDR_ANY); - if (bind(server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { throw std::runtime_error("Can't bind TCP socket"); } - if (listen(server_sock, 1) < 0) { + if (listen(m_server_sock, 1) < 0) { throw std::runtime_error("Can't listen TCP socket"); } while (m_running) { struct sockaddr_in client; - socklen_t client_len = sizeof(client); - int client_sock = accept(server_sock, - (struct sockaddr*)&client, &client_len); + int client_sock = accept_with_timeout(m_server_sock, 1000, &client); - if (client_sock < 0) { + if (client_sock == -1) { throw runtime_error("Could not establish new connection"); } + else if (client_sock == -2) { + continue; + } while (m_running) { uint8_t request_version = 0; - int read = recv(client_sock, &request_version, 1, 0); + ssize_t read = recv(client_sock, &request_version, 1, 0); if (!read) break; // done reading if (read < 0) { etiLog.level(info) << - "DPD Feedback Server Client read request verson failed"; + "DPD Feedback Server Client read request version failed"; } if (request_version != 1) { @@ -209,7 +239,7 @@ void OutputUHDFeedback::ServeFeedbackThread() // We are ready to issue the request now { boost::mutex::scoped_lock lock(burstRequest.mutex); - burstRequest.frame_length = num_samples; + burstRequest.num_samples = num_samples; burstRequest.state = BurstRequestState::SaveTransmitFrame; lock.unlock(); @@ -226,6 +256,15 @@ void OutputUHDFeedback::ServeFeedbackThread() lock.unlock(); if (send(client_sock, + &burstRequest.num_samples, + sizeof(burstRequest.num_samples), + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send num_samples failed"; + break; + } + + if (send(client_sock, &burstRequest.tx_second, sizeof(burstRequest.tx_second), 0) < 0) { @@ -243,7 +282,45 @@ void OutputUHDFeedback::ServeFeedbackThread() break; } -#warning "Send buf" + const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf); + + assert(burstRequest.tx_samples.size() == frame_bytes); + if (send(client_sock, + &burstRequest.tx_samples[0], + frame_bytes, + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_frame failed"; + break; + } + + if (send(client_sock, + &burstRequest.rx_second, + sizeof(burstRequest.rx_second), + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_second failed"; + break; + } + + if (send(client_sock, + &burstRequest.rx_pps, + sizeof(burstRequest.rx_pps), + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_pps failed"; + break; + } + + assert(burstRequest.rx_samples.size() == frame_bytes); + if (send(client_sock, + &burstRequest.rx_samples[0], + frame_bytes, + 0) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_frame failed"; + break; + } } close(client_sock); @@ -255,8 +332,9 @@ void OutputUHDFeedback::ServeFeedbackThread() m_running = false; - if (server_sock != -1) { - close(server_sock); + if (m_server_sock != -1) { + close(m_server_sock); + m_server_sock = -1; } } |