aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/utils/tasks.cpp
blob: 661315ae8934dfb943a95d45bffa783077eae43b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
//
// Copyright 2011,2014 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
//

#include <uhd/utils/tasks.hpp>
#include <uhd/utils/msg_task.hpp>
#include <uhd/utils/msg.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/barrier.hpp>
#include <exception>
#include <iostream>
#include <vector>

using namespace uhd;

class task_impl : public task{
public:

    task_impl(const task_fcn_type &task_fcn):
        _spawn_barrier(2)
    {
        (void)_thread_group.create_thread(boost::bind(&task_impl::task_loop, this, task_fcn));
        _spawn_barrier.wait();
    }

    ~task_impl(void){
        _running = false;
        _thread_group.interrupt_all();
        _thread_group.join_all();
    }

private:

    void task_loop(const task_fcn_type &task_fcn){
        _running = true;
        _spawn_barrier.wait();

        try{
            while (_running){
                task_fcn();
            }
        }
        catch(const boost::thread_interrupted &){
            //this is an ok way to exit the task loop
        }
        catch(const std::exception &e){
            do_error_msg(e.what());
        }
        catch(...){
            //FIXME
            //Unfortunately, this is also an ok way to end a task,
            //because on some systems boost throws uncatchables.
        }
    }

    void do_error_msg(const std::string &msg){
        UHD_MSG(error)
            << "An unexpected exception was caught in a task loop." << std::endl
            << "The task loop will now exit, things may not work." << std::endl
            << msg << std::endl
        ;
    }

    boost::thread_group _thread_group;
    boost::barrier _spawn_barrier;
    bool _running;
};

task::sptr task::make(const task_fcn_type &task_fcn){
    return task::sptr(new task_impl(task_fcn));
}

msg_task::~msg_task(void){
    /* NOP */
}

/*
 * During shutdown pointers to queues for radio_ctrl_core might not be available anymore.
 * msg_task_impl provides a dump_queue for such messages.
 * ctrl_cores can check this queue for stranded messages.
 */

class msg_task_impl : public msg_task{
public:

    msg_task_impl(const task_fcn_type &task_fcn):
        _spawn_barrier(2)
    {
        (void)_thread_group.create_thread(boost::bind(&msg_task_impl::task_loop, this, task_fcn));
        _spawn_barrier.wait();
    }

    ~msg_task_impl(void){
        _running = false;
        _thread_group.interrupt_all();
        _thread_group.join_all();
    }

    /*
     * Returns the first message for the given SID.
     * This way a radio_ctrl_core doesn't have to die in timeout but can check for stranded messages here.
     * This might happen during shutdown when dtors are called.
     * See also: comments in b200_io_impl->handle_async_task
     */
    msg_payload_t get_msg_from_dump_queue(uint32_t sid)
    {
        boost::mutex::scoped_lock lock(_mutex);
        msg_payload_t b;
        for (size_t i = 0; i < _dump_queue.size(); i++) {
            if (sid == _dump_queue[i].first) {
                b = _dump_queue[i].second;
                _dump_queue.erase(_dump_queue.begin() + i);
                break;
            }
        }
        return b;
    }

private:

    void task_loop(const task_fcn_type &task_fcn){
        _running = true;
        _spawn_barrier.wait();

        try{
            while (_running){
            	boost::optional<msg_type_t> buff = task_fcn();
            	if(buff != boost::none){
            	    /*
            	     * If a message gets stranded it is returned by task_fcn and then pushed to the dump_queue.
            	     * This way ctrl_cores can check dump_queue for missing messages.
            	     */
            	    boost::mutex::scoped_lock lock(_mutex);
            	    _dump_queue.push_back(buff.get() );
            	}
            }
        }
        catch(const boost::thread_interrupted &){
            //this is an ok way to exit the task loop
        }
        catch(const std::exception &e){
            do_error_msg(e.what());
        }
        catch(...){
            //FIXME
            //Unfortunately, this is also an ok way to end a task,
            //because on some systems boost throws uncatchables.
        }
    }

    void do_error_msg(const std::string &msg){
        UHD_MSG(error)
            << "An unexpected exception was caught in a task loop." << std::endl
            << "The task loop will now exit, things may not work." << std::endl
            << msg << std::endl
        ;
    }

    boost::mutex _mutex;
    boost::thread_group _thread_group;
    boost::barrier _spawn_barrier;
    bool _running;

    /*
     * This queue holds stranded messages until a radio_ctrl_core grabs them via 'get_msg_from_dump_queue'.
     */
    std::vector <msg_type_t> _dump_queue;
};

msg_task::sptr msg_task::make(const task_fcn_type &task_fcn){
    return msg_task::sptr(new msg_task_impl(task_fcn));
}