/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
Copyright (C) 2015
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
ODR-DabMod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMod. If not, see .
*/
#include "Flowgraph.h"
#include "PcDebug.h"
#if HAVE_DECL__MM_MALLOC
# include
#else
# define memalign(a, b) malloc(b)
#endif
#include
#include
#include
#if defined(_WIN32) and !defined(__MINGW32__)
//#include
//#include
#else
#include
#endif
using namespace boost;
typedef std::vector >::iterator NodeIterator;
typedef std::vector >::iterator EdgeIterator;
Node::Node(shared_ptr plugin) :
myPlugin(plugin),
myProcessTime(0)
{
PDEBUG("Node::Node(plugin(%s): %p) @ %p\n",
plugin->name(), plugin.get(), this);
}
Node::~Node()
{
PDEBUG("Node::~Node() @ %p\n", this);
assert(myInputBuffers.size() == 0);
assert(myOutputBuffers.size() == 0);
}
Edge::Edge(shared_ptr& srcNode, shared_ptr& dstNode) :
mySrcNode(srcNode),
myDstNode(dstNode)
{
PDEBUG("Edge::Edge(srcNode(%s): %p, dstNode(%s): %p) @ %p\n",
srcNode->plugin()->name(), srcNode.get(),
dstNode->plugin()->name(), dstNode.get(),
this);
myBuffer = shared_ptr(new Buffer());
srcNode->myOutputBuffers.push_back(myBuffer);
dstNode->myInputBuffers.push_back(myBuffer);
}
Edge::~Edge()
{
PDEBUG("Edge::~Edge() @ %p\n", this);
std::vector >::iterator buffer;
if (myBuffer != NULL) {
for (buffer = mySrcNode->myOutputBuffers.begin();
buffer != mySrcNode->myOutputBuffers.end();
++buffer) {
if (*buffer == myBuffer) {
mySrcNode->myOutputBuffers.erase(buffer);
break;
}
}
for (buffer = myDstNode->myInputBuffers.begin();
buffer != myDstNode->myInputBuffers.end();
++buffer) {
if (*buffer == myBuffer) {
myDstNode->myInputBuffers.erase(buffer);
break;
}
}
}
}
int Node::process()
{
PDEBUG("Edge::process()\n");
PDEBUG(" Plugin name: %s (%p)\n", myPlugin->name(), myPlugin.get());
// the plugin process() still wants vector
// arguments.
std::vector inBuffers;
std::vector >::iterator buffer;
for (buffer = myInputBuffers.begin();
buffer != myInputBuffers.end();
++buffer) {
inBuffers.push_back(buffer->get());
}
std::vector outBuffers;
for (buffer = myOutputBuffers.begin();
buffer != myOutputBuffers.end();
++buffer) {
outBuffers.push_back(buffer->get());
}
return myPlugin->process(inBuffers, outBuffers);
}
Flowgraph::Flowgraph() :
myProcessTime(0)
{
PDEBUG("Flowgraph::Flowgraph() @ %p\n", this);
}
Flowgraph::~Flowgraph()
{
PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this);
if (myProcessTime) {
fprintf(stderr, "Process time:\n");
std::vector >::const_iterator node;
for (node = nodes.begin(); node != nodes.end(); ++node) {
fprintf(stderr, " %30s: %10u us (%2.2f %%)\n",
(*node)->plugin()->name(),
(unsigned)(*node)->processTime(),
(*node)->processTime() * 100.0 / myProcessTime);
}
fprintf(stderr, " %30s: %10u us (100.00 %%)\n", "total",
(unsigned)myProcessTime);
}
}
void Flowgraph::connect(shared_ptr input, shared_ptr output)
{
PDEBUG("Flowgraph::connect(input(%s): %p, output(%s): %p)\n",
input->name(), input.get(), output->name(), output.get());
NodeIterator inputNode;
NodeIterator outputNode;
for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) {
if ((*inputNode)->plugin() == input) {
break;
}
}
if (inputNode == nodes.end()) {
inputNode = nodes.insert(nodes.end(), shared_ptr(new Node(input)));
}
for (outputNode = nodes.begin(); outputNode != nodes.end(); ++outputNode) {
if ((*outputNode)->plugin() == output) {
break;
}
}
if (outputNode == nodes.end()) {
outputNode = nodes.insert(nodes.end(), shared_ptr(new Node(output)));
for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) {
if ((*inputNode)->plugin() == input) {
break;
}
}
} else if (inputNode > outputNode) {
shared_ptr node = *outputNode;
nodes.erase(outputNode);
outputNode = nodes.insert(nodes.end(), node);
for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) {
if ((*inputNode)->plugin() == input) {
break;
}
}
}
assert((*inputNode)->plugin() == input);
assert((*outputNode)->plugin() == output);
edges.push_back(shared_ptr(new Edge(*inputNode, *outputNode)));
}
bool Flowgraph::run()
{
PDEBUG("Flowgraph::run()\n");
std::vector >::const_iterator node;
timeval start, stop;
time_t diff;
gettimeofday(&start, NULL);
for (node = nodes.begin(); node != nodes.end(); ++node) {
int ret = (*node)->process();
PDEBUG(" ret: %i\n", ret);
gettimeofday(&stop, NULL);
diff = (stop.tv_sec - start.tv_sec) * 1000000 +
stop.tv_usec - start.tv_usec;
myProcessTime += diff;
(*node)->addProcessTime(diff);
start = stop;
if (!ret) {
return false;
}
}
return true;
}