// // Copyright 2011,2014 Ettus Research LLC // Copyright 2018 Ettus Research, a National Instruments Company // // SPDX-License-Identifier: GPL-3.0-or-later // #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace uhd; class task_impl : public task { public: task_impl(const task_fcn_type& task_fcn, const std::string& name) : _exit(false) { _task = std::thread([this, task_fcn]() { this->task_loop(task_fcn); }); if (not name.empty()) { set_thread_name(&_task, name); } } ~task_impl(void) { _exit = true; if (_task.joinable()) { _task.join(); } } private: void task_loop(const task_fcn_type& task_fcn) { try { while (!_exit) { task_fcn(); } } catch (const std::exception& e) { do_error_msg(e.what()); } catch (...) { UHD_THROW_INVALID_CODE_PATH(); } } void do_error_msg(const std::string& msg) { UHD_LOGGER_ERROR("UHD") << "An unexpected exception was caught in a task loop." << "The task loop will now exit, things may not work." << msg; } std::atomic _exit; std::thread _task; }; task::sptr task::make(const task_fcn_type& task_fcn, const std::string& name) { return task::sptr(new task_impl(task_fcn, name)); } msg_task::~msg_task(void) { /* NOP */ } /* * During shutdown pointers to queues for radio_ctrl_core might not be available anymore. * msg_task_impl provides a dump_queue for such messages. * ctrl_cores can check this queue for stranded messages. */ class msg_task_impl : public msg_task { public: msg_task_impl(const task_fcn_type& task_fcn) : _spawn_barrier(2) { (void)_thread_group.create_thread( std::bind(&msg_task_impl::task_loop, this, task_fcn)); _spawn_barrier.wait(); } ~msg_task_impl(void) override { _running = false; _thread_group.interrupt_all(); _thread_group.join_all(); } /* * Returns the first message for the given SID. * This way a radio_ctrl_core doesn't have to die in timeout but can check for * stranded messages here. This might happen during shutdown when dtors are called. * See also: comments in b200_io_impl->handle_async_task */ msg_payload_t get_msg_from_dump_queue(uint32_t sid) override { std::lock_guard lock(_mutex); msg_payload_t b; for (size_t i = 0; i < _dump_queue.size(); i++) { if (sid == _dump_queue[i].first) { b = _dump_queue[i].second; _dump_queue.erase(_dump_queue.begin() + i); break; } } return b; } private: void task_loop(const task_fcn_type& task_fcn) { _running = true; _spawn_barrier.wait(); try { while (_running) { boost::optional buff = task_fcn(); if (buff != boost::none) { /* * If a message gets stranded it is returned by task_fcn and then * pushed to the dump_queue. This way ctrl_cores can check dump_queue * for missing messages. */ std::lock_guard lock(_mutex); _dump_queue.push_back(buff.get()); } } } catch (const boost::thread_interrupted&) { // this is an ok way to exit the task loop } catch (const std::exception& e) { do_error_msg(e.what()); } catch (...) { // FIXME // Unfortunately, this is also an ok way to end a task, // because on some systems boost throws uncatchables. } } void do_error_msg(const std::string& msg) { UHD_LOGGER_ERROR("UHD") << "An unexpected exception was caught in a task loop." << "The task loop will now exit, things may not work." << msg; } std::mutex _mutex; boost::thread_group _thread_group; boost::barrier _spawn_barrier; bool _running; /* * This queue holds stranded messages until a radio_ctrl_core grabs them via * 'get_msg_from_dump_queue'. */ std::vector _dump_queue; }; msg_task::sptr msg_task::make(const task_fcn_type& task_fcn) { return msg_task::sptr(new msg_task_impl(task_fcn)); }