From 058809e6678a22e0448d5de718cb6d4864bb9cb5 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 12 Jun 2015 09:40:37 +0200 Subject: Cleanup OutputUHD structure --- src/OutputUHD.cpp | 531 +++++++++++++++++++++++++----------------------------- src/OutputUHD.h | 26 +++ 2 files changed, 269 insertions(+), 288 deletions(-) (limited to 'src') diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp index 72d4976..cc6c0b6 100644 --- a/src/OutputUHD.cpp +++ b/src/OutputUHD.cpp @@ -449,63 +449,40 @@ bool check_gps_fix_ok(struct UHDWorkerData *uwd) void UHDWorker::process() { - int workerbuffer = 0; - time_t tx_second = 0; - double pps_offset = 0; - double last_pps = 2.0; - double usrp_time; + int workerbuffer = 0; + tx_second = 0; + pps_offset = 0.0; + last_pps = 2.0; // Variables needed for GPS fix check - double last_gps_fix_check = 0.0; - const double gps_fix_check_interval = 10.0; // seconds - int num_checks_without_gps_fix = 0; - boost::packaged_task gps_fix_pt; - boost::unique_future gps_fix_future; - boost::thread gps_fix_task; - - // Asynchronous message statistics - int num_underflows = 0; - int num_late_packets = 0; - - //const struct timespec hundred_nano = {0, 100}; - - size_t sizeIn; - struct UHDWorkerFrameData* frame; - - size_t num_acc_samps; //number of accumulated samples - //int write_fail_count; - - // Transmit timeout - const double timeout = 20.0; + last_gps_fix_check = 0.0; + num_checks_without_gps_fix = 0; #if FAKE_UHD == 0 uhd::stream_args_t stream_args("fc32"); //complex floats - uhd::tx_streamer::sptr myTxStream = uwd->myUsrp->get_tx_stream(stream_args); - size_t usrp_max_num_samps = myTxStream->get_max_num_samps(); -#else - size_t usrp_max_num_samps = 2048; // arbitrarily chosen + myTxStream = uwd->myUsrp->get_tx_stream(stream_args); #endif - const complexf* in; - - uhd::tx_metadata_t md; md.start_of_burst = false; - md.end_of_burst = false; + md.end_of_burst = false; + + expected_next_fct = -1; - int expected_next_fct = -1; + num_underflows = 0; + num_late_packets = 0; while (uwd->running) { - bool fct_discontinuity = false; - md.has_time_spec = false; - md.time_spec = uhd::time_spec_t(0.0); - num_acc_samps = 0; - //write_fail_count = 0; + fct_discontinuity = false; + md.has_time_spec = false; + md.time_spec = uhd::time_spec_t(0.0); /* Wait for barrier */ // this wait will hopefully always be the second one // because modulation should be quicker than transmission uwd->sync_barrier.get()->wait(); + struct UHDWorkerFrameData* frame; + if (workerbuffer == 0) { frame = &(uwd->frame0); } @@ -517,306 +494,284 @@ void UHDWorker::process() "UHDWorker.process: workerbuffer is neither 0 nor 1 !"); } - in = reinterpret_cast(frame->buf); - pps_offset = frame->ts.timestamp_pps_offset; + handle_frame(frame); - // Tx second from MNSC - tx_second = frame->ts.timestamp_sec; + // swap buffers + workerbuffer = (workerbuffer + 1) % 2; + } +} - sizeIn = uwd->bufsize / sizeof(complexf); +void UHDWorker::handle_frame(const struct UHDWorkerFrameData *frame) +{ + const double gps_fix_check_interval = 10.0; // seconds - /* Verify that the FCT value is correct. If we miss one transmission - * frame we must interrupt UHD and resync to the timestamps - */ - if (frame->ts.fct == -1) { - etiLog.level(info) << - "OutputUHD: dropping one frame with invalid FCT"; - goto loopend; - } - if (expected_next_fct != -1) { - if (expected_next_fct != (int)frame->ts.fct) { - etiLog.level(warn) << - "OutputUHD: Incorrect expect fct " << frame->ts.fct << - ", expected " << expected_next_fct; + pps_offset = frame->ts.timestamp_pps_offset; - fct_discontinuity = true; - throw fct_discontinuity_error(); - } - } + // Tx second from MNSC + tx_second = frame->ts.timestamp_sec; - expected_next_fct = (frame->ts.fct + uwd->fct_increment) % 250; + /* Verify that the FCT value is correct. If we miss one transmission + * frame we must interrupt UHD and resync to the timestamps + */ + if (frame->ts.fct == -1) { + etiLog.level(info) << + "OutputUHD: dropping one frame with invalid FCT"; + return; + } + if (expected_next_fct != -1) { + if (expected_next_fct != (int)frame->ts.fct) { + etiLog.level(warn) << + "OutputUHD: Incorrect expect fct " << frame->ts.fct << + ", expected " << expected_next_fct; + + fct_discontinuity = true; + throw fct_discontinuity_error(); + } + } - // Check for ref_lock - if (uwd->check_refclk_loss) { - try { - // TODO: Is this check specific to the B100 and USRP2 ? - if (! uwd->myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) { - etiLog.log(alert, - "OutputUHD: External reference clock lock lost !"); - if (uwd->refclk_lock_loss_behaviour == CRASH) { - throw std::runtime_error( - "OutputUHD: External reference clock lock lost."); - } + expected_next_fct = (frame->ts.fct + uwd->fct_increment) % 250; + + // Check for ref_lock + if (uwd->check_refclk_loss) { + try { + // TODO: Is this check specific to the B100 and USRP2 ? + if (! uwd->myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) { + etiLog.log(alert, + "OutputUHD: External reference clock lock lost !"); + if (uwd->refclk_lock_loss_behaviour == CRASH) { + throw std::runtime_error( + "OutputUHD: External reference clock lock lost."); } } - catch (uhd::lookup_error &e) { - uwd->check_refclk_loss = false; - etiLog.log(warn, - "OutputUHD: This USRP does not have mboard sensor for ext clock loss." - " Check disabled."); - } } + catch (uhd::lookup_error &e) { + uwd->check_refclk_loss = false; + etiLog.log(warn, + "OutputUHD: This USRP does not have mboard sensor for ext clock loss." + " Check disabled."); + } + } - usrp_time = uwd->myUsrp->get_time_now().get_real_secs(); + double usrp_time = uwd->myUsrp->get_time_now().get_real_secs(); - if (uwd->check_gpsfix and - // Divide interval by two because we alternate between - // launch and check - last_gps_fix_check + gps_fix_check_interval/2.0 < usrp_time) { - last_gps_fix_check = usrp_time; + if (uwd->check_gpsfix and + // Divide interval by two because we alternate between + // launch and check + last_gps_fix_check + gps_fix_check_interval/2.0 < usrp_time) { + last_gps_fix_check = usrp_time; - // Alternate between launching thread and checking the - // result. - if (gps_fix_task.joinable()) { - if (gps_fix_future.has_value()) { + // Alternate between launching thread and checking the + // result. + if (gps_fix_task.joinable()) { + if (gps_fix_future.has_value()) { - gps_fix_future.wait(); + gps_fix_future.wait(); - gps_fix_task.join(); + gps_fix_task.join(); - if (not gps_fix_future.get()) { - if (not num_checks_without_gps_fix) { - etiLog.level(alert) << - "OutputUHD: GPS Fix lost"; - } - num_checks_without_gps_fix++; + if (not gps_fix_future.get()) { + if (num_checks_without_gps_fix == 0) { + etiLog.level(alert) << + "OutputUHD: GPS Fix lost"; } - else { - if (num_checks_without_gps_fix) { - etiLog.level(info) << - "OutputUHD: GPS Fix recovered"; - } - num_checks_without_gps_fix = 0; + num_checks_without_gps_fix++; + } + else { + if (num_checks_without_gps_fix) { + etiLog.level(info) << + "OutputUHD: GPS Fix recovered"; } + num_checks_without_gps_fix = 0; + } - if (gps_fix_check_interval * num_checks_without_gps_fix > - uwd->max_gps_holdover) { - std::stringstream ss; - ss << "Lost GPS fix for " << gps_fix_check_interval * - num_checks_without_gps_fix << " seconds"; - throw std::runtime_error(ss.str()); - } + if (gps_fix_check_interval * num_checks_without_gps_fix > + uwd->max_gps_holdover) { + std::stringstream ss; + ss << "Lost GPS fix for " << gps_fix_check_interval * + num_checks_without_gps_fix << " seconds"; + throw std::runtime_error(ss.str()); } } - else { - // Checking the sensor here takes too much - // time, it has to be done in a separate thread. - gps_fix_pt = boost::packaged_task( - boost::bind(check_gps_fix_ok, uwd) ); + } + else { + // Checking the sensor here takes too much + // time, it has to be done in a separate thread. + gps_fix_pt = boost::packaged_task( + boost::bind(check_gps_fix_ok, uwd) ); - gps_fix_future = gps_fix_pt.get_future(); + gps_fix_future = gps_fix_pt.get_future(); - gps_fix_task = boost::thread(boost::move(gps_fix_pt)); - } + gps_fix_task = boost::thread(boost::move(gps_fix_pt)); } + } - if (uwd->sourceContainsTimestamp) { - if (!frame->ts.timestamp_valid) { - /* We have not received a full timestamp through - * MNSC. We sleep through the frame. - */ - etiLog.level(info) << - "OutputUHD: Throwing sample " << frame->ts.fct << - " away: incomplete timestamp " << tx_second << - " + " << pps_offset; - usleep(20000); //TODO should this be TM-dependant ? - goto loopend; - } - - md.has_time_spec = true; - md.time_spec = uhd::time_spec_t(tx_second, pps_offset); - - // md is defined, let's do some checks - if (md.time_spec.get_real_secs() + timeout < usrp_time) { - etiLog.level(warn) << - "OutputUHD: Timestamp in the past! offset: " << - md.time_spec.get_real_secs() - usrp_time << - " (" << usrp_time << ")" - " frame " << frame->ts.fct << - ", tx_second " << tx_second << - ", pps " << pps_offset; - goto loopend; //skip the frame - } + if (uwd->sourceContainsTimestamp) { + if (!frame->ts.timestamp_valid) { + /* We have not received a full timestamp through + * MNSC. We sleep through the frame. + */ + etiLog.level(info) << + "OutputUHD: Throwing sample " << frame->ts.fct << + " away: incomplete timestamp " << tx_second << + " + " << pps_offset; + usleep(20000); //TODO should this be TM-dependant ? + return; + } -#if 0 // Let uhd handle this - if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_MARGIN_FUTURE) { - etiLog.level(warn) << - "OutputUHD: Timestamp too far in the future! offset: " << - md.time_spec.get_real_secs() - usrp_time; - usleep(20000); //sleep so as to fill buffers - } -#endif + md.has_time_spec = true; + md.time_spec = uhd::time_spec_t(tx_second, pps_offset); + + // md is defined, let's do some checks + if (md.time_spec.get_real_secs() + tx_timeout < usrp_time) { + etiLog.level(warn) << + "OutputUHD: Timestamp in the past! offset: " << + md.time_spec.get_real_secs() - usrp_time << + " (" << usrp_time << ")" + " frame " << frame->ts.fct << + ", tx_second " << tx_second << + ", pps " << pps_offset; + return; + } - if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) { - etiLog.level(error) << - "OutputUHD: Timestamp way too far in the future! offset: " << - md.time_spec.get_real_secs() - usrp_time; - throw std::runtime_error("Timestamp error. Aborted."); - } + if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) { + etiLog.level(error) << + "OutputUHD: Timestamp way too far in the future! offset: " << + md.time_spec.get_real_secs() - usrp_time; + throw std::runtime_error("Timestamp error. Aborted."); } - else { // !uwd->sourceContainsTimestamp - if (uwd->muting || uwd->muteNoTimestamps) { - /* There was some error decoding the timestamp - */ - if (uwd->muting) { - etiLog.log(info, - "OutputUHD: Muting sample %d requested\n", - frame->ts.fct); - } - else { - etiLog.log(info, - "OutputUHD: Muting sample %d : no timestamp\n", - frame->ts.fct); - } - usleep(20000); - goto loopend; + } + else { // !uwd->sourceContainsTimestamp + if (uwd->muting || uwd->muteNoTimestamps) { + /* There was some error decoding the timestamp + */ + if (uwd->muting) { + etiLog.log(info, + "OutputUHD: Muting sample %d requested\n", + frame->ts.fct); } + else { + etiLog.log(info, + "OutputUHD: Muting sample %d : no timestamp\n", + frame->ts.fct); + } + usleep(20000); + return; } + } - PDEBUG("UHDWorker::process:max_num_samps: %zu.\n", - usrp_max_num_samps); + tx_frame(frame); - while (uwd->running && !uwd->muting && (num_acc_samps < sizeIn)) { - size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps); + if (last_pps > pps_offset) { + if (num_underflows or num_late_packets) { + etiLog.log(info, + "OutputUHD status (usrp time: %f): " + "%d underruns and %d late packets since last status.\n", + usrp_time, + num_underflows, num_late_packets); + } + num_underflows = 0; + num_late_packets = 0; + } - //ensure the the last packet has EOB set if the timestamps has been - //refreshed and need to be reconsidered. - //Also, if we saw that the FCT did not increment as expected, which - //could be due to a lost incoming packet. - md.end_of_burst = ( - uwd->sourceContainsTimestamp && - (frame->ts.timestamp_refresh || fct_discontinuity) && - samps_to_send <= usrp_max_num_samps ); + last_pps = pps_offset; +} +void UHDWorker::tx_frame(const struct UHDWorkerFrameData *frame) +{ + const size_t sizeIn = uwd->bufsize / sizeof(complexf); + const complexf* in_data = reinterpret_cast(frame->buf); -#if FAKE_UHD - // This is probably very approximate - usleep( (1000000 / uwd->sampleRate) * samps_to_send); - size_t num_tx_samps = samps_to_send; +#if FAKE_UHD == 0 + size_t usrp_max_num_samps = myTxStream->get_max_num_samps(); #else - //send a single packet - size_t num_tx_samps = myTxStream->send( - &in[num_acc_samps], - samps_to_send, md, timeout); + size_t usrp_max_num_samps = 2048; // arbitrarily chosen #endif - num_acc_samps += num_tx_samps; + size_t num_acc_samps = 0; //number of accumulated samples + while (uwd->running && !uwd->muting && (num_acc_samps < sizeIn)) { + size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps); - md.time_spec = uhd::time_spec_t(tx_second, pps_offset) - + uhd::time_spec_t(0, num_acc_samps/uwd->sampleRate); + //ensure the the last packet has EOB set if the timestamps has been + //refreshed and need to be reconsidered. + //Also, if we saw that the FCT did not increment as expected, which + //could be due to a lost incoming packet. + md.end_of_burst = ( + uwd->sourceContainsTimestamp && + (frame->ts.timestamp_refresh || fct_discontinuity) && + samps_to_send <= usrp_max_num_samps ); - /* - fprintf(stderr, "*** pps_offset %f, md.time_spec %f, usrp->now %f\n", - pps_offset, - md.time_spec.get_real_secs(), - uwd->myUsrp->get_time_now().get_real_secs()); - // */ - - if (num_tx_samps == 0) { -#if 1 - etiLog.log(warn, - "UHDWorker::process() unable to write to device, skipping frame!\n"); - break; +#if FAKE_UHD + // This is probably very approximate + usleep( (1000000 / uwd->sampleRate) * samps_to_send); + size_t num_tx_samps = samps_to_send; #else - // This has been disabled, because if there is a write failure, - // we'd better not insist and try to go on transmitting future - // frames. - // The goal is not to try to send by all means possible. It's - // more important to make sure the SFN is not disturbed. - - fprintf(stderr, "F"); - nanosleep(&hundred_nano, NULL); - write_fail_count++; - if (write_fail_count >= 3) { - double ts = md.time_spec.get_real_secs(); - double t_usrp = uwd->myUsrp->get_time_now().get_real_secs(); - - fprintf(stderr, "*** USRP write fail count %d\n", write_fail_count); - fprintf(stderr, "*** delta %f, md.time_spec %f, usrp->now %f\n", - ts - t_usrp, - ts, t_usrp); - - fprintf(stderr, "UHDWorker::process() unable to write to device, skipping frame!\n"); - break; - } + //send a single packet + size_t num_tx_samps = myTxStream->send( + &in_data[num_acc_samps], + samps_to_send, md, tx_timeout); #endif - } -#if FAKE_UHD == 0 - uhd::async_metadata_t async_md; - if (uwd->myUsrp->get_device()->recv_async_msg(async_md, 0)) { - const char* uhd_async_message = ""; - bool failure = false; - switch (async_md.event_code) { - case uhd::async_metadata_t::EVENT_CODE_BURST_ACK: - break; - case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW: - uhd_async_message = "Underflow"; - num_underflows++; - break; - case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR: - uhd_async_message = "Packet loss between host and device."; - failure = true; - break; - case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR: - uhd_async_message = "Packet had time that was late."; - num_late_packets++; - break; - case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET: - uhd_async_message = "Underflow occurred inside a packet."; - failure = true; - break; - case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST: - uhd_async_message = "Packet loss within a burst."; - failure = true; - break; - default: - uhd_async_message = "unknown event code"; - failure = true; - break; - } + num_acc_samps += num_tx_samps; - if (failure) { - etiLog.level(alert) << "Near frame " << - frame->ts.fct << ": Received Async UHD Message '" << - uhd_async_message << "'"; + md.time_spec = uhd::time_spec_t(tx_second, pps_offset) + + uhd::time_spec_t(0, num_acc_samps/uwd->sampleRate); - } - } -#endif + if (num_tx_samps == 0) { + etiLog.log(warn, + "UHDWorker::process() unable to write to device, skipping frame!\n"); + break; } - if (last_pps > pps_offset) { - if (num_underflows or num_late_packets) { - etiLog.log(info, - "OutputUHD status (usrp time: %f): " - "%d underruns and %d late packets since last status.\n", - usrp_time, - num_underflows, num_late_packets); - } - num_underflows = 0; - num_late_packets = 0; - } + print_async_metadata(frame); + } +} +void UHDWorker::print_async_metadata(const struct UHDWorkerFrameData *frame) +{ +#if FAKE_UHD == 0 + uhd::async_metadata_t async_md; + if (uwd->myUsrp->get_device()->recv_async_msg(async_md, 0)) { + const char* uhd_async_message = ""; + bool failure = false; + switch (async_md.event_code) { + case uhd::async_metadata_t::EVENT_CODE_BURST_ACK: + break; + case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW: + uhd_async_message = "Underflow"; + num_underflows++; + break; + case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR: + uhd_async_message = "Packet loss between host and device."; + failure = true; + break; + case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR: + uhd_async_message = "Packet had time that was late."; + num_late_packets++; + break; + case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET: + uhd_async_message = "Underflow occurred inside a packet."; + failure = true; + break; + case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST: + uhd_async_message = "Packet loss within a burst."; + failure = true; + break; + default: + uhd_async_message = "unknown event code"; + failure = true; + break; + } - last_pps = pps_offset; + if (failure) { + etiLog.level(alert) << "Near frame " << + frame->ts.fct << ": Received Async UHD Message '" << + uhd_async_message << "'"; -loopend: - // swap buffers - workerbuffer = (workerbuffer + 1) % 2; + } } +#endif } diff --git a/src/OutputUHD.h b/src/OutputUHD.h index bf5b27d..49489e3 100644 --- a/src/OutputUHD.h +++ b/src/OutputUHD.h @@ -160,9 +160,35 @@ class UHDWorker { } private: + // Asynchronous message statistics + int num_underflows; + int num_late_packets; + + bool fct_discontinuity; + int expected_next_fct; + uhd::tx_metadata_t md; + time_t tx_second; + double pps_offset; + double last_pps; + + // GPS Fix check variables + double last_gps_fix_check; + int num_checks_without_gps_fix; + boost::packaged_task gps_fix_pt; + boost::unique_future gps_fix_future; + boost::thread gps_fix_task; + + + // Transmit timeout + static const double tx_timeout = 20.0; + void process(); void process_errhandler(); + void print_async_metadata(const struct UHDWorkerFrameData *frame); + + void handle_frame(const struct UHDWorkerFrameData *frame); + void tx_frame(const struct UHDWorkerFrameData *frame); struct UHDWorkerData *uwd; boost::thread uhd_thread; -- cgit v1.2.3