1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
ODR-DabMod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include "Buffer.h"
#include "ThreadsafeQueue.h"
#include <cstddef>
#include <vector>
#include <memory>
#include <thread>
#include <atomic>
// All flowgraph elements derive from ModPlugin, or a variant of it.
// Some ModPlugins also support handling metadata.
struct frame_timestamp;
struct flowgraph_metadata {
std::shared_ptr<struct frame_timestamp> ts;
};
using meta_vec_t = std::vector<flowgraph_metadata>;
/* ModPlugins that support metadata derive from ModMetadata */
class ModMetadata {
public:
// Receives metadata from all inputs, and process them, and output
// a sequence of metadata.
virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0;
};
/* Abstract base class for all flowgraph elements */
class ModPlugin
{
public:
virtual int process(
std::vector<Buffer*> dataIn,
std::vector<Buffer*> dataOut) = 0;
virtual const char* name() = 0;
virtual ~ModPlugin() = default;
};
/* Inputs are sources, the output buffers without reading any */
class ModInput : public ModPlugin
{
public:
virtual int process(
std::vector<Buffer*> dataIn,
std::vector<Buffer*> dataOut);
virtual int process(Buffer* dataOut) = 0;
};
/* Codecs are 1-input 1-output flowgraph plugins */
class ModCodec : public ModPlugin
{
public:
virtual int process(
std::vector<Buffer*> dataIn,
std::vector<Buffer*> dataOut);
virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0;
};
/* Pipelined ModCodecs run their processing in a separate thread, and
* have a one-call-to-process() latency. Because of this latency, they
* must also handle the metadata
*/
class PipelinedModCodec : public ModCodec, public ModMetadata
{
public:
virtual int process(Buffer* const dataIn, Buffer* dataOut) final;
virtual const char* name() = 0;
virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) final;
protected:
// Once the instance implementing PipelinedModCodec has been constructed,
// it must call start_pipeline_thread()
void start_pipeline_thread(void);
// To avoid race conditions on teardown, plugins must call
// stop_pipeline_thread in their destructor.
void stop_pipeline_thread(void);
// The real processing must be implemented in internal_process
virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0;
private:
bool m_ready_to_output_data = false;
ThreadsafeQueue<Buffer> m_input_queue;
ThreadsafeQueue<Buffer> m_output_queue;
std::deque<meta_vec_t> m_metadata_fifo;
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::thread m_thread;
void process_thread(void);
};
/* Muxes are N-input 1-output flowgraph plugins */
class ModMux : public ModPlugin
{
public:
virtual int process(
std::vector<Buffer*> dataIn,
std::vector<Buffer*> dataOut);
virtual int process(std::vector<Buffer*> dataIn, Buffer* dataOut) = 0;
};
/* Outputs do not create any output buffers */
class ModOutput : public ModPlugin
{
public:
virtual int process(
std::vector<Buffer*> dataIn,
std::vector<Buffer*> dataOut);
virtual int process(Buffer* dataIn) = 0;
};
|