summaryrefslogtreecommitdiffstats
path: root/src/Flowgraph.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-07 10:38:55 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-07 10:38:55 +0100
commit8953a94893fe39a10b044ea62cf6971d36801880 (patch)
tree0cd0ab921827400f3ada7449827bbffaa97e7450 /src/Flowgraph.cpp
parente0f9c8909ecba56da4c7a2ec3507b8af19b737bd (diff)
parent0315433aef00644085d2278af405eaedbc184c5c (diff)
downloaddabmod-8953a94893fe39a10b044ea62cf6971d36801880.tar.gz
dabmod-8953a94893fe39a10b044ea62cf6971d36801880.tar.bz2
dabmod-8953a94893fe39a10b044ea62cf6971d36801880.zip
Merge branch 'flowgraphmetadata' into outputRefactoring
Diffstat (limited to 'src/Flowgraph.cpp')
-rw-r--r--src/Flowgraph.cpp68
1 files changed, 60 insertions, 8 deletions
diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp
index 465ef41..0b80a8c 100644
--- a/src/Flowgraph.cpp
+++ b/src/Flowgraph.cpp
@@ -26,6 +26,8 @@
#include "Flowgraph.h"
#include "PcDebug.h"
+#include "Log.h"
+#include "TimestampDecoder.h"
#include <string>
#include <memory>
#include <algorithm>
@@ -57,9 +59,10 @@ Node::~Node()
assert(myOutputBuffers.size() == 0);
}
-void Node::addOutputBuffer(Buffer::sptr& buffer)
+void Node::addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myOutputBuffers.push_back(buffer);
+ myOutputMetadata.push_back(md);
#if DEBUG
std::string fname = string(myPlugin->name()) +
"-" + to_string(myDebugFiles.size()) +
@@ -71,7 +74,7 @@ void Node::addOutputBuffer(Buffer::sptr& buffer)
#endif
}
-void Node::removeOutputBuffer(Buffer::sptr& buffer)
+void Node::removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myOutputBuffers.begin(),
@@ -89,14 +92,23 @@ void Node::removeOutputBuffer(Buffer::sptr& buffer)
#endif
myOutputBuffers.erase(it);
}
+
+ auto mdit = std::find(
+ myOutputMetadata.begin(),
+ myOutputMetadata.end(),
+ md);
+ if (mdit != myOutputMetadata.end()) {
+ myOutputMetadata.erase(mdit);
+ }
}
-void Node::addInputBuffer(Buffer::sptr& buffer)
+void Node::addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
myInputBuffers.push_back(buffer);
+ myInputMetadata.push_back(md);
}
-void Node::removeInputBuffer(Buffer::sptr& buffer)
+void Node::removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)
{
auto it = std::find(
myInputBuffers.begin(),
@@ -105,6 +117,14 @@ void Node::removeInputBuffer(Buffer::sptr& buffer)
if (it != myInputBuffers.end()) {
myInputBuffers.erase(it);
}
+
+ auto mdit = std::find(
+ myInputMetadata.begin(),
+ myInputMetadata.end(),
+ md);
+ if (mdit != myInputMetadata.end()) {
+ myInputMetadata.erase(mdit);
+ }
}
int Node::process()
@@ -127,6 +147,36 @@ int Node::process()
}
int ret = myPlugin->process(inBuffers, outBuffers);
+
+ // Collect all incoming metadata into a single vector
+ meta_vec_t all_input_mds;
+ for (auto& md_vec_sp : myInputMetadata) {
+ if (md_vec_sp) {
+ copy(md_vec_sp->begin(), md_vec_sp->end(),
+ back_inserter(all_input_mds));
+ }
+ }
+
+ auto mod_meta = dynamic_pointer_cast<ModMetadata>(myPlugin);
+ if (mod_meta) {
+ auto outputMetadata = mod_meta->process_metadata(all_input_mds);
+ // Distribute the result metadata to all outputs
+ for (auto& out_md : myOutputMetadata) {
+ out_md->clear();
+ std::move(outputMetadata.begin(), outputMetadata.end(),
+ std::back_inserter(*out_md));
+ }
+ }
+ else {
+ // Propagate the unmodified input metadata to all outputs
+ for (auto& out_md : myOutputMetadata) {
+ out_md->clear();
+ std::move(all_input_mds.begin(), all_input_mds.end(),
+ std::back_inserter(*out_md));
+ }
+ }
+
+
#if DEBUG
assert(myDebugFiles.size() == myOutputBuffers.size());
@@ -158,8 +208,10 @@ Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :
this);
myBuffer = make_shared<Buffer>();
- srcNode->addOutputBuffer(myBuffer);
- dstNode->addInputBuffer(myBuffer);
+ myMetadata = make_shared<vector<flowgraph_metadata> >();
+
+ srcNode->addOutputBuffer(myBuffer, myMetadata);
+ dstNode->addInputBuffer(myBuffer, myMetadata);
}
@@ -168,8 +220,8 @@ Edge::~Edge()
PDEBUG("Edge::~Edge() @ %p\n", this);
if (myBuffer) {
- mySrcNode->removeOutputBuffer(myBuffer);
- myDstNode->removeInputBuffer(myBuffer);
+ mySrcNode->removeOutputBuffer(myBuffer, myMetadata);
+ myDstNode->removeInputBuffer(myBuffer, myMetadata);
}
}