summaryrefslogtreecommitdiffstats
path: root/src/Flowgraph.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-20 01:20:41 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-20 01:20:41 +0100
commit28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d (patch)
treef22941e989bb775aacda52876c97ada7b899a7dd /src/Flowgraph.cpp
parent95f556cf0797ab4c23f431e5c8c5accfa7f4c30b (diff)
parentf52b0e13f61a947c26236504ffb4b072352abc04 (diff)
downloaddabmod-28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d.tar.gz
dabmod-28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d.tar.bz2
dabmod-28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d.zip
Merge branch 'outputRefactoring' into next
Diffstat (limited to 'src/Flowgraph.cpp')
-rw-r--r--src/Flowgraph.cpp73
1 files changed, 62 insertions, 11 deletions
diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp
index 465ef41..506832c 100644
--- a/src/Flowgraph.cpp
+++ b/src/Flowgraph.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,6 +26,7 @@
#include "Flowgraph.h"
#include "PcDebug.h"
+#include "Log.h"
#include <string>
#include <memory>
#include <algorithm>
@@ -57,9 +58,10 @@ Node::~Node()
assert(myOutputBuffers.size() == 0);
}
-void Node::addOutputBuffer(Buffer::sptr& buffer)
+void Node::addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myOutputBuffers.push_back(buffer);
+ myOutputMetadata.push_back(md);
#if DEBUG
std::string fname = string(myPlugin->name()) +
"-" + to_string(myDebugFiles.size()) +
@@ -71,7 +73,7 @@ void Node::addOutputBuffer(Buffer::sptr& buffer)
#endif
}
-void Node::removeOutputBuffer(Buffer::sptr& buffer)
+void Node::removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myOutputBuffers.begin(),
@@ -89,14 +91,23 @@ void Node::removeOutputBuffer(Buffer::sptr& buffer)
#endif
myOutputBuffers.erase(it);
}
+
+ auto mdit = std::find(
+ myOutputMetadata.begin(),
+ myOutputMetadata.end(),
+ md);
+ if (mdit != myOutputMetadata.end()) {
+ myOutputMetadata.erase(mdit);
+ }
}
-void Node::addInputBuffer(Buffer::sptr& buffer)
+void Node::addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myInputBuffers.push_back(buffer);
+ myInputMetadata.push_back(md);
}
-void Node::removeInputBuffer(Buffer::sptr& buffer)
+void Node::removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myInputBuffers.begin(),
@@ -105,6 +116,14 @@ void Node::removeInputBuffer(Buffer::sptr& buffer)
if (it != myInputBuffers.end()) {
myInputBuffers.erase(it);
}
+
+ auto mdit = std::find(
+ myInputMetadata.begin(),
+ myInputMetadata.end(),
+ md);
+ if (mdit != myInputMetadata.end()) {
+ myInputMetadata.erase(mdit);
+ }
}
int Node::process()
@@ -127,6 +146,37 @@ int Node::process()
}
int ret = myPlugin->process(inBuffers, outBuffers);
+
+ // Collect all incoming metadata into a single vector
+ meta_vec_t all_input_mds;
+ for (auto& md_vec_sp : myInputMetadata) {
+ if (md_vec_sp) {
+ move(md_vec_sp->begin(), md_vec_sp->end(),
+ back_inserter(all_input_mds));
+ md_vec_sp->clear();
+ }
+ }
+
+ auto mod_meta = dynamic_pointer_cast<ModMetadata>(myPlugin);
+ if (mod_meta) {
+ auto outputMetadata = mod_meta->process_metadata(all_input_mds);
+ // Distribute the result metadata to all outputs
+ for (auto& out_md : myOutputMetadata) {
+ out_md->clear();
+ std::move(outputMetadata.begin(), outputMetadata.end(),
+ std::back_inserter(*out_md));
+ }
+ }
+ else {
+ // Propagate the unmodified input metadata to all outputs
+ for (auto& out_md : myOutputMetadata) {
+ out_md->clear();
+ std::move(all_input_mds.begin(), all_input_mds.end(),
+ std::back_inserter(*out_md));
+ }
+ }
+
+
#if DEBUG
assert(myDebugFiles.size() == myOutputBuffers.size());
@@ -158,8 +208,10 @@ Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :
this);
myBuffer = make_shared<Buffer>();
- srcNode->addOutputBuffer(myBuffer);
- dstNode->addInputBuffer(myBuffer);
+ myMetadata = make_shared<vector<flowgraph_metadata> >();
+
+ srcNode->addOutputBuffer(myBuffer, myMetadata);
+ dstNode->addInputBuffer(myBuffer, myMetadata);
}
@@ -168,8 +220,8 @@ Edge::~Edge()
PDEBUG("Edge::~Edge() @ %p\n", this);
if (myBuffer) {
- mySrcNode->removeOutputBuffer(myBuffer);
- myDstNode->removeInputBuffer(myBuffer);
+ mySrcNode->removeOutputBuffer(myBuffer, myMetadata);
+ myDstNode->removeInputBuffer(myBuffer, myMetadata);
}
}
@@ -186,9 +238,8 @@ Flowgraph::~Flowgraph()
{
PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this);
- stringstream ss;
-
if (myProcessTime) {
+ stringstream ss;
ss << "Process time:\n";
char node_time_sz[1024] = {};