1 /*
2  * Copyright 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /*
18  * FlowGraph.h
19  *
20  * Processing node and ports that can be used in a simple data flow graph.
21  * This was designed to work with audio but could be used for other
22  * types of data.
23  */
24 
25 #ifndef FLOWGRAPH_FLOW_GRAPH_NODE_H
26 #define FLOWGRAPH_FLOW_GRAPH_NODE_H
27 
28 #include <cassert>
29 #include <cstring>
30 #include <math.h>
31 #include <memory>
32 #include <sys/types.h>
33 #include <time.h>
34 #include <unistd.h>
35 #include <vector>
36 
37 // TODO Move these classes into separate files.
38 // TODO Review use of raw pointers for connect(). Maybe use smart pointers but need to avoid
39 //      run-time deallocation in audio thread.
40 
41 // Set flags FLOWGRAPH_ANDROID_INTERNAL and FLOWGRAPH_OUTER_NAMESPACE based on whether compiler
42 // flag __ANDROID_NDK__ is defined. __ANDROID_NDK__ should be defined in oboe and not aaudio.
43 
44 #ifndef FLOWGRAPH_ANDROID_INTERNAL
45 #ifdef __ANDROID_NDK__
46 #define FLOWGRAPH_ANDROID_INTERNAL 0
47 #else
48 #define FLOWGRAPH_ANDROID_INTERNAL 1
49 #endif // __ANDROID_NDK__
50 #endif // FLOWGRAPH_ANDROID_INTERNAL
51 
52 #ifndef FLOWGRAPH_OUTER_NAMESPACE
53 #ifdef __ANDROID_NDK__
54 #define FLOWGRAPH_OUTER_NAMESPACE oboe
55 #else
56 #define FLOWGRAPH_OUTER_NAMESPACE aaudio
57 #endif // __ANDROID_NDK__
58 #endif // FLOWGRAPH_OUTER_NAMESPACE
59 
60 namespace FLOWGRAPH_OUTER_NAMESPACE::flowgraph {
61 
62 // Default block size that can be overridden when the FlowGraphPortFloat is created.
63 // If it is too small then we will have too much overhead from switching between nodes.
64 // If it is too high then we will thrash the caches.
65 constexpr int kDefaultBufferSize = 8; // arbitrary
66 
67 class FlowGraphPort;
68 class FlowGraphPortFloatInput;
69 
70 /***************************************************************************/
71 /**
72  * Base class for all nodes in the flowgraph.
73  */
74 class FlowGraphNode {
75 public:
76     FlowGraphNode() = default;
77     virtual ~FlowGraphNode() = default;
78 
79     /**
80      * Read from the input ports,
81      * generate multiple frames of data then write the results to the output ports.
82      *
83      * @param numFrames maximum number of frames requested for processing
84      * @return number of frames actually processed
85      */
86     virtual int32_t onProcess(int32_t numFrames) = 0;
87 
88     /**
89      * If the callCount is at or after the previous callCount then call
90      * pullData on all of the upstreamNodes.
91      * Then call onProcess().
92      * This prevents infinite recursion in case of cyclic graphs.
93      * It also prevents nodes upstream from a branch from being executed twice.
94      *
95      * @param callCount
96      * @param numFrames
97      * @return number of frames valid
98      */
99     int32_t pullData(int32_t numFrames, int64_t callCount);
100 
101     /**
102      * Recursively reset all the nodes in the graph, starting from a Sink.
103      *
104      * This must not be called at the same time as pullData!
105      */
106     void pullReset();
107 
108     /**
109      * Reset framePosition counters.
110      */
111     virtual void reset();
112 
addInputPort(FlowGraphPort & port)113     void addInputPort(FlowGraphPort &port) {
114         mInputPorts.emplace_back(port);
115     }
116 
isDataPulledAutomatically()117     bool isDataPulledAutomatically() const {
118         return mDataPulledAutomatically;
119     }
120 
121     /**
122      * Set true if you want the data pulled through the graph automatically.
123      * This is the default.
124      *
125      * Set false if you want to pull the data from the input ports in the onProcess() method.
126      * You might do this, for example, in a sample rate converting node.
127      *
128      * @param automatic
129      */
setDataPulledAutomatically(bool automatic)130     void setDataPulledAutomatically(bool automatic) {
131         mDataPulledAutomatically = automatic;
132     }
133 
getName()134     virtual const char *getName() {
135         return "FlowGraph";
136     }
137 
getLastCallCount()138     int64_t getLastCallCount() {
139         return mLastCallCount;
140     }
141 
142 protected:
143 
144     static constexpr int64_t  kInitialCallCount = -1;
145     int64_t  mLastCallCount = kInitialCallCount;
146 
147     std::vector<std::reference_wrapper<FlowGraphPort>> mInputPorts;
148 
149 private:
150     bool     mDataPulledAutomatically = true;
151     bool     mBlockRecursion = false;
152     int32_t  mLastFrameCount = 0;
153 
154 };
155 
156 /***************************************************************************/
157 /**
158   * This is a connector that allows data to flow between modules.
159   *
160   * The ports are the primary means of interacting with a module.
161   * So they are generally declared as public.
162   *
163   */
164 class FlowGraphPort {
165 public:
FlowGraphPort(FlowGraphNode & parent,int32_t samplesPerFrame)166     FlowGraphPort(FlowGraphNode &parent, int32_t samplesPerFrame)
167             : mContainingNode(parent)
168             , mSamplesPerFrame(samplesPerFrame) {
169     }
170 
171     virtual ~FlowGraphPort() = default;
172 
173     // Ports are often declared public. So let's make them non-copyable.
174     FlowGraphPort(const FlowGraphPort&) = delete;
175     FlowGraphPort& operator=(const FlowGraphPort&) = delete;
176 
getSamplesPerFrame()177     int32_t getSamplesPerFrame() const {
178         return mSamplesPerFrame;
179     }
180 
181     virtual int32_t pullData(int64_t framePosition, int32_t numFrames) = 0;
182 
pullReset()183     virtual void pullReset() {}
184 
185 protected:
186     FlowGraphNode &mContainingNode;
187 
188 private:
189     const int32_t    mSamplesPerFrame = 1;
190 };
191 
192 /***************************************************************************/
193 /**
194  * This port contains a 32-bit float buffer that can contain several frames of data.
195  * Processing the data in a block improves performance.
196  *
197  * The size is framesPerBuffer * samplesPerFrame).
198  */
199 class FlowGraphPortFloat  : public FlowGraphPort {
200 public:
201     FlowGraphPortFloat(FlowGraphNode &parent,
202                    int32_t samplesPerFrame,
203                    int32_t framesPerBuffer = kDefaultBufferSize
204                 );
205 
206     virtual ~FlowGraphPortFloat() = default;
207 
getFramesPerBuffer()208     int32_t getFramesPerBuffer() const {
209         return mFramesPerBuffer;
210     }
211 
212 protected:
213 
214     /**
215      * @return buffer internal to the port or from a connected port
216      */
getBuffer()217     virtual float *getBuffer() {
218         return mBuffer.get();
219     }
220 
221 private:
222     const int32_t    mFramesPerBuffer = 1;
223     std::unique_ptr<float[]> mBuffer; // allocated in constructor
224 };
225 
226 /***************************************************************************/
227 /**
228   * The results of a node's processing are stored in the buffers of the output ports.
229   */
230 class FlowGraphPortFloatOutput : public FlowGraphPortFloat {
231 public:
FlowGraphPortFloatOutput(FlowGraphNode & parent,int32_t samplesPerFrame)232     FlowGraphPortFloatOutput(FlowGraphNode &parent, int32_t samplesPerFrame)
233             : FlowGraphPortFloat(parent, samplesPerFrame) {
234     }
235 
236     virtual ~FlowGraphPortFloatOutput() = default;
237 
238     using FlowGraphPortFloat::getBuffer;
239 
240     /**
241      * Connect to the input of another module.
242      * An input port can only have one connection.
243      * An output port can have multiple connections.
244      * If you connect a second output port to an input port
245      * then it overwrites the previous connection.
246      *
247      * This not thread safe. Do not modify the graph topology from another thread while running.
248      * Also do not delete a module while it is connected to another port if the graph is running.
249      */
250     void connect(FlowGraphPortFloatInput *port);
251 
252     /**
253      * Disconnect from the input of another module.
254      * This not thread safe.
255      */
256     void disconnect(FlowGraphPortFloatInput *port);
257 
258     /**
259      * Call the parent module's onProcess() method.
260      * That may pull data from its inputs and recursively
261      * process the entire graph.
262      * @return number of frames actually pulled
263      */
264     int32_t pullData(int64_t framePosition, int32_t numFrames) override;
265 
266 
267     void pullReset() override;
268 
269 };
270 
271 /***************************************************************************/
272 
273 /**
274  * An input port for streaming audio data.
275  * You can set a value that will be used for processing.
276  * If you connect an output port to this port then its value will be used instead.
277  */
278 class FlowGraphPortFloatInput : public FlowGraphPortFloat {
279 public:
FlowGraphPortFloatInput(FlowGraphNode & parent,int32_t samplesPerFrame)280     FlowGraphPortFloatInput(FlowGraphNode &parent, int32_t samplesPerFrame)
281             : FlowGraphPortFloat(parent, samplesPerFrame) {
282         // Add to parent so it can pull data from each input.
283         parent.addInputPort(*this);
284     }
285 
286     virtual ~FlowGraphPortFloatInput() = default;
287 
288     /**
289      * If connected to an output port then this will return
290      * that output ports buffers.
291      * If not connected then it returns the input ports own buffer
292      * which can be loaded using setValue().
293      */
294     float *getBuffer() override;
295 
296     /**
297      * Write every value of the float buffer.
298      * This value will be ignored if an output port is connected
299      * to this port.
300      */
setValue(float value)301     void setValue(float value) {
302         int numFloats = kDefaultBufferSize * getSamplesPerFrame();
303         float *buffer = getBuffer();
304         for (int i = 0; i < numFloats; i++) {
305             *buffer++ = value;
306         }
307     }
308 
309     /**
310      * Connect to the output of another module.
311      * An input port can only have one connection.
312      * An output port can have multiple connections.
313      * This not thread safe.
314      */
connect(FlowGraphPortFloatOutput * port)315     void connect(FlowGraphPortFloatOutput *port) {
316         assert(getSamplesPerFrame() == port->getSamplesPerFrame());
317         mConnected = port;
318     }
319 
disconnect(FlowGraphPortFloatOutput * port)320     void disconnect(FlowGraphPortFloatOutput *port) {
321         assert(mConnected == port);
322         (void) port;
323         mConnected = nullptr;
324     }
325 
disconnect()326     void disconnect() {
327         mConnected = nullptr;
328     }
329 
330     /**
331      * Pull data from any output port that is connected.
332      */
333     int32_t pullData(int64_t framePosition, int32_t numFrames) override;
334 
335     void pullReset() override;
336 
337 private:
338     FlowGraphPortFloatOutput *mConnected = nullptr;
339 };
340 
341 /***************************************************************************/
342 
343 /**
344  * Base class for an edge node in a graph that has no upstream nodes.
345  * It outputs data but does not consume data.
346  * By default, it will read its data from an external buffer.
347  */
348 class FlowGraphSource : public FlowGraphNode {
349 public:
FlowGraphSource(int32_t channelCount)350     explicit FlowGraphSource(int32_t channelCount)
351             : output(*this, channelCount) {
352     }
353 
354     virtual ~FlowGraphSource() = default;
355 
356     FlowGraphPortFloatOutput output;
357 };
358 
359 /***************************************************************************/
360 
361 /**
362  * Base class for an edge node in a graph that has no upstream nodes.
363  * It outputs data but does not consume data.
364  * By default, it will read its data from an external buffer.
365  */
366 class FlowGraphSourceBuffered : public FlowGraphSource {
367 public:
FlowGraphSourceBuffered(int32_t channelCount)368     explicit FlowGraphSourceBuffered(int32_t channelCount)
369             : FlowGraphSource(channelCount) {}
370 
371     virtual ~FlowGraphSourceBuffered() = default;
372 
373     /**
374      * Specify buffer that the node will read from.
375      *
376      * @param data TODO Consider using std::shared_ptr.
377      * @param numFrames
378      */
setData(const void * data,int32_t numFrames)379     void setData(const void *data, int32_t numFrames) {
380         mData = data;
381         mSizeInFrames = numFrames;
382         mFrameIndex = 0;
383     }
384 
385 protected:
386     const void *mData = nullptr;
387     int32_t     mSizeInFrames = 0; // number of frames in mData
388     int32_t     mFrameIndex = 0; // index of next frame to be processed
389 };
390 
391 /***************************************************************************/
392 /**
393  * Base class for an edge node in a graph that has no downstream nodes.
394  * It consumes data but does not output data.
395  * This graph will be executed when data is read() from this node
396  * by pulling data from upstream nodes.
397  */
398 class FlowGraphSink : public FlowGraphNode {
399 public:
FlowGraphSink(int32_t channelCount)400     explicit FlowGraphSink(int32_t channelCount)
401             : input(*this, channelCount) {
402     }
403 
404     virtual ~FlowGraphSink() = default;
405 
406     FlowGraphPortFloatInput input;
407 
408     /**
409      * Do nothing. The work happens in the read() method.
410      *
411      * @param numFrames
412      * @return number of frames actually processed
413      */
onProcess(int32_t numFrames)414     int32_t onProcess(int32_t numFrames) override {
415         return numFrames;
416     }
417 
418     virtual int32_t read(void *data, int32_t numFrames) = 0;
419 
420 protected:
421     /**
422      * Pull data through the graph using this nodes last callCount.
423      * @param numFrames
424      * @return
425      */
426     int32_t pullData(int32_t numFrames);
427 };
428 
429 /***************************************************************************/
430 /**
431  * Base class for a node that has an input and an output with the same number of channels.
432  * This may include traditional filters, eg. FIR, but also include
433  * any processing node that converts input to output.
434  */
435 class FlowGraphFilter : public FlowGraphNode {
436 public:
FlowGraphFilter(int32_t channelCount)437     explicit FlowGraphFilter(int32_t channelCount)
438             : input(*this, channelCount)
439             , output(*this, channelCount) {
440     }
441 
442     virtual ~FlowGraphFilter() = default;
443 
444     FlowGraphPortFloatInput input;
445     FlowGraphPortFloatOutput output;
446 };
447 
448 } /* namespace FLOWGRAPH_OUTER_NAMESPACE::flowgraph */
449 
450 #endif /* FLOWGRAPH_FLOW_GRAPH_NODE_H */
451