aboutsummaryrefslogtreecommitdiffstats
path: root/src/Flowgraph.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/Flowgraph.cpp')
-rw-r--r--src/Flowgraph.cpp129
1 files changed, 86 insertions, 43 deletions
diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp
index c199945..0eb1c60 100644
--- a/src/Flowgraph.cpp
+++ b/src/Flowgraph.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2015
+ Copyright (C) 2016
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -27,6 +27,7 @@
#include "Flowgraph.h"
#include "PcDebug.h"
#include <memory>
+#include <algorithm>
#if HAVE_DECL__MM_MALLOC
# include <mm_malloc.h>
@@ -36,17 +37,12 @@
#include <sys/types.h>
#include <stdexcept>
#include <assert.h>
-#if defined(_WIN32) and !defined(__MINGW32__)
-//#include <sys/timeb.h>
-//#include <sys/types.h>
-#else
#include <sys/time.h>
-#endif
using namespace std;
-typedef std::vector<shared_ptr<Node> >::iterator NodeIterator;
-typedef std::vector<shared_ptr<Edge> >::iterator EdgeIterator;
+using NodeIterator = std::vector<shared_ptr<Node> >::iterator;
+using EdgeIterator = std::vector<shared_ptr<Edge> >::iterator;
Node::Node(shared_ptr<ModPlugin> plugin) :
@@ -55,10 +51,8 @@ Node::Node(shared_ptr<ModPlugin> plugin) :
{
PDEBUG("Node::Node(plugin(%s): %p) @ %p\n",
plugin->name(), plugin.get(), this);
-
}
-
Node::~Node()
{
PDEBUG("Node::~Node() @ %p\n", this);
@@ -67,6 +61,55 @@ Node::~Node()
assert(myOutputBuffers.size() == 0);
}
+void Node::addOutputBuffer(Buffer::sptr& buffer)
+{
+ myOutputBuffers.push_back(buffer);
+#if DEBUG
+ std::string fname = string(myPlugin->name()) +
+ "-" + to_string(myDebugFiles.size()) +
+ "-" + to_string((size_t)(void*)myPlugin.get()) +
+ ".dat";
+ FILE* fd = fopen(fname.c_str(), "wb");
+ assert(fd != nullptr);
+ myDebugFiles.push_back(fd);
+#endif
+}
+
+void Node::removeOutputBuffer(Buffer::sptr& buffer)
+{
+ auto it = std::find(
+ myOutputBuffers.begin(),
+ myOutputBuffers.end(),
+ buffer);
+ if (it != myOutputBuffers.end()) {
+#if DEBUG
+ size_t pos = std::distance(myOutputBuffers.begin(),
+ it);
+
+ auto fd_it = std::next(myDebugFiles.begin(), pos);
+
+ fclose(*fd_it);
+ myDebugFiles.erase(fd_it);
+#endif
+ myOutputBuffers.erase(it);
+ }
+}
+
+void Node::addInputBuffer(Buffer::sptr& buffer)
+{
+ myInputBuffers.push_back(buffer);
+}
+
+void Node::removeInputBuffer(Buffer::sptr& buffer)
+{
+ auto it = std::find(
+ myInputBuffers.begin(),
+ myInputBuffers.end(),
+ buffer);
+ if (it != myInputBuffers.end()) {
+ myInputBuffers.erase(it);
+ }
+}
Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :
mySrcNode(srcNode),
@@ -78,8 +121,8 @@ Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :
this);
myBuffer = make_shared<Buffer>();
- srcNode->myOutputBuffers.push_back(myBuffer);
- dstNode->myInputBuffers.push_back(myBuffer);
+ srcNode->addOutputBuffer(myBuffer);
+ dstNode->addInputBuffer(myBuffer);
}
@@ -87,52 +130,52 @@ Edge::~Edge()
{
PDEBUG("Edge::~Edge() @ %p\n", this);
- std::vector<shared_ptr<Buffer> >::iterator buffer;
- if (myBuffer != NULL) {
- for (buffer = mySrcNode->myOutputBuffers.begin();
- buffer != mySrcNode->myOutputBuffers.end();
- ++buffer) {
- if (*buffer == myBuffer) {
- mySrcNode->myOutputBuffers.erase(buffer);
- break;
- }
- }
-
- for (buffer = myDstNode->myInputBuffers.begin();
- buffer != myDstNode->myInputBuffers.end();
- ++buffer) {
- if (*buffer == myBuffer) {
- myDstNode->myInputBuffers.erase(buffer);
- break;
- }
- }
+ if (myBuffer) {
+ mySrcNode->removeOutputBuffer(myBuffer);
+ myDstNode->removeInputBuffer(myBuffer);
}
}
int Node::process()
{
- PDEBUG("Edge::process()\n");
+ PDEBUG("Node::process()\n");
PDEBUG(" Plugin name: %s (%p)\n", myPlugin->name(), myPlugin.get());
- // the plugin process() still wants vector<Buffer*>
+ // the plugin process() wants vector<Buffer*>
// arguments.
std::vector<Buffer*> inBuffers;
- std::vector<shared_ptr<Buffer> >::iterator buffer;
- for (buffer = myInputBuffers.begin();
- buffer != myInputBuffers.end();
- ++buffer) {
- inBuffers.push_back(buffer->get());
+ for (auto& buffer : myInputBuffers) {
+ assert(buffer.get() != nullptr);
+ inBuffers.push_back(buffer.get());
}
std::vector<Buffer*> outBuffers;
- for (buffer = myOutputBuffers.begin();
- buffer != myOutputBuffers.end();
- ++buffer) {
- outBuffers.push_back(buffer->get());
+ for (auto& buffer : myOutputBuffers) {
+ assert(buffer.get() != nullptr);
+ outBuffers.push_back(buffer.get());
}
- return myPlugin->process(inBuffers, outBuffers);
+ int ret = myPlugin->process(inBuffers, outBuffers);
+#if DEBUG
+ assert(myDebugFiles.size() == myOutputBuffers.size());
+
+ auto buf = myOutputBuffers.begin();
+ auto fd_it = myDebugFiles.begin();
+
+ for (size_t i = 0; i < myDebugFiles.size(); i++) {
+ if (*buf) {
+ Buffer& b = *buf->get();
+ FILE* fd = *fd_it;
+
+ fwrite(b.getData(), b.getLength(), 1, fd);
+ }
+
+ ++buf;
+ ++fd_it;
+ }
+#endif
+ return ret;
}