/*
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
Copyright (C) 2016
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
ODR-DabMod is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMod is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMod. If not, see .
*/
#include "Flowgraph.h"
#include "PcDebug.h"
#include "Log.h"
#include "TimestampDecoder.h"
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
using NodeIterator = std::vector >::iterator;
using EdgeIterator = std::vector >::iterator;
Node::Node(shared_ptr plugin) :
myPlugin(plugin),
myProcessTime(0)
{
PDEBUG("Node::Node(plugin(%s): %p) @ %p\n",
plugin->name(), plugin.get(), this);
}
Node::~Node()
{
PDEBUG("Node::~Node() @ %p\n", this);
assert(myInputBuffers.size() == 0);
assert(myOutputBuffers.size() == 0);
}
void Node::addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myOutputBuffers.push_back(buffer);
myOutputMetadata.push_back(md);
#if DEBUG
std::string fname = string(myPlugin->name()) +
"-" + to_string(myDebugFiles.size()) +
"-" + to_string((size_t)(void*)myPlugin.get()) +
".dat";
FILE* fd = fopen(fname.c_str(), "wb");
assert(fd != nullptr);
myDebugFiles.push_back(fd);
#endif
}
void Node::removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myOutputBuffers.begin(),
myOutputBuffers.end(),
buffer);
if (it != myOutputBuffers.end()) {
#if DEBUG
size_t pos = std::distance(myOutputBuffers.begin(),
it);
auto fd_it = std::next(myDebugFiles.begin(), pos);
fclose(*fd_it);
myDebugFiles.erase(fd_it);
#endif
myOutputBuffers.erase(it);
}
auto mdit = std::find(
myOutputMetadata.begin(),
myOutputMetadata.end(),
md);
if (mdit != myOutputMetadata.end()) {
myOutputMetadata.erase(mdit);
}
}
void Node::addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myInputBuffers.push_back(buffer);
myInputMetadata.push_back(md);
}
void Node::removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myInputBuffers.begin(),
myInputBuffers.end(),
buffer);
if (it != myInputBuffers.end()) {
myInputBuffers.erase(it);
}
auto mdit = std::find(
myInputMetadata.begin(),
myInputMetadata.end(),
md);
if (mdit != myInputMetadata.end()) {
myInputMetadata.erase(mdit);
}
}
int Node::process()
{
PDEBUG("Node::process()\n");
PDEBUG(" Plugin name: %s (%p)\n", myPlugin->name(), myPlugin.get());
// the plugin process() wants vector
// arguments.
std::vector inBuffers;
for (auto& buffer : myInputBuffers) {
assert(buffer.get() != nullptr);
inBuffers.push_back(buffer.get());
}
std::vector outBuffers;
for (auto& buffer : myOutputBuffers) {
assert(buffer.get() != nullptr);
outBuffers.push_back(buffer.get());
}
int ret = myPlugin->process(inBuffers, outBuffers);
// Collect all incoming metadata into a single vector
meta_vec_t all_input_mds;
for (auto& md_vec_sp : myInputMetadata) {
if (md_vec_sp) {
copy(md_vec_sp->begin(), md_vec_sp->end(),
back_inserter(all_input_mds));
}
}
auto mod_meta = dynamic_pointer_cast(myPlugin);
if (mod_meta) {
auto outputMetadata = mod_meta->process_metadata(all_input_mds);
// Distribute the result metadata to all outputs
for (auto& out_md : myOutputMetadata) {
out_md->clear();
std::move(outputMetadata.begin(), outputMetadata.end(),
std::back_inserter(*out_md));
}
}
else {
// Propagate the unmodified input metadata to all outputs
for (auto& out_md : myOutputMetadata) {
out_md->clear();
std::move(all_input_mds.begin(), all_input_mds.end(),
std::back_inserter(*out_md));
}
}
#if DEBUG
assert(myDebugFiles.size() == myOutputBuffers.size());
auto buf = myOutputBuffers.begin();
auto fd_it = myDebugFiles.begin();
for (size_t i = 0; i < myDebugFiles.size(); i++) {
if (*buf) {
Buffer& b = *buf->get();
FILE* fd = *fd_it;
fwrite(b.getData(), b.getLength(), 1, fd);
}
++buf;
++fd_it;
}
#endif
return ret;
}
Edge::Edge(shared_ptr& srcNode, shared_ptr& dstNode) :
mySrcNode(srcNode),
myDstNode(dstNode)
{
PDEBUG("Edge::Edge(srcNode(%s): %p, dstNode(%s): %p) @ %p\n",
srcNode->plugin()->name(), srcNode.get(),
dstNode->plugin()->name(), dstNode.get(),
this);
myBuffer = make_shared();
myMetadata = make_shared >();
srcNode->addOutputBuffer(myBuffer, myMetadata);
dstNode->addInputBuffer(myBuffer, myMetadata);
}
Edge::~Edge()
{
PDEBUG("Edge::~Edge() @ %p\n", this);
if (myBuffer) {
mySrcNode->removeOutputBuffer(myBuffer, myMetadata);
myDstNode->removeInputBuffer(myBuffer, myMetadata);
}
}
Flowgraph::Flowgraph() :
myProcessTime(0)
{
PDEBUG("Flowgraph::Flowgraph() @ %p\n", this);
}
Flowgraph::~Flowgraph()
{
PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this);
stringstream ss;
if (myProcessTime) {
ss << "Process time:\n";
char node_time_sz[1024] = {};
for (const auto &node : nodes) {
snprintf(node_time_sz, 1023, " %30s: %10lu us (%2.2f %%)\n",
node->plugin()->name(),
node->processTime(),
node->processTime() * 100.0 / myProcessTime);
ss << node_time_sz;
}
snprintf(node_time_sz, 1023, " %30s: %10lu us (100.00 %%)\n", "total",
myProcessTime);
ss << node_time_sz;
fprintf(stderr, "%s", ss.str().c_str());
}
}
void Flowgraph::connect(shared_ptr input, shared_ptr output)
{
PDEBUG("Flowgraph::connect(input(%s): %p, output(%s): %p)\n",
input->name(), input.get(), output->name(), output.get());
NodeIterator inputNode;
NodeIterator outputNode;
for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) {
if ((*inputNode)->plugin() == input) {
break;
}
}
if (inputNode == nodes.end()) {
inputNode = nodes.insert(nodes.end(), make_shared(input));
}
for (outputNode = nodes.begin(); outputNode != nodes.end(); ++outputNode) {
if ((*outputNode)->plugin() == output) {
break;
}
}
if (outputNode == nodes.end()) {
outputNode = nodes.insert(nodes.end(), make_shared(output));
for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) {
if ((*inputNode)->plugin() == input) {
break;
}
}
} else if (inputNode > outputNode) {
shared_ptr node = *outputNode;
nodes.erase(outputNode);
outputNode = nodes.insert(nodes.end(), node);
for (inputNode = nodes.begin(); inputNode != nodes.end(); ++inputNode) {
if ((*inputNode)->plugin() == input) {
break;
}
}
}
assert((*inputNode)->plugin() == input);
assert((*outputNode)->plugin() == output);
edges.push_back(make_shared(*inputNode, *outputNode));
}
bool Flowgraph::run()
{
PDEBUG("Flowgraph::run()\n");
timeval start, stop;
time_t diff;
gettimeofday(&start, NULL);
for (const auto &node : nodes) {
int ret = node->process();
PDEBUG(" ret: %i\n", ret);
gettimeofday(&stop, NULL);
diff = (stop.tv_sec - start.tv_sec) * 1000000 +
stop.tv_usec - start.tv_usec;
myProcessTime += diff;
node->addProcessTime(diff);
start = stop;
if (!ret) {
return false;
}
}
return true;
}