Skip to content

bridgekat/flowgraph

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

32 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Flowgraph

Test

A multithreaded executor for static computation graphs.

Each compute node can hold a mutable state, read inputs and produce outputs that may contain references into the node's state or into its inputs: it is guaranteed that no dangling references can be read.

Multiple nodes can be fused together into a single node, or be scheduled independently to be run in parallel.

Examples

A diamond-shaped graph - source s, a = s + 1, and d = s + a:

use flowgraph::core::Pool;
use flowgraph::typed::{Graph, GraphBuilder, Port, Segment, Source};

/// A stateless increment operator.
struct Inc;

impl Segment for Inc {
    type Inputs = Port<i64>;
    type Outputs = Port<i64>;
    type State = ();

    fn init(self) {}

    fn compute<'a, 'b: 'a>(
        (x_notify, x): (bool, i64),
        _state: &'b mut (),
        _is_first_run: bool
    ) -> (bool, i64) {
        (x_notify, x + 1)
    }
}

/// A stateless adder with two inputs.
struct Add;

impl Segment for Add {
    type Inputs = (Port<i64>, Port<i64>);
    type Outputs = Port<i64>;
    type State = ();

    fn init(self) {}

    fn compute<'a, 'b: 'a>(
        ((a_notify, a), (b_notify, b)): ((bool, i64), (bool, i64)),
        _state: &'b mut (),
        _is_first_run: bool
    ) -> (bool, i64) {
        (a_notify || b_notify, a + b)
    }
}

// Create the thread pool.
let mut pool = Pool::new(std::thread::available_parallelism().unwrap().get());

// Create the graph, which runs each segment for the first time.
let mut b = GraphBuilder::new();
let s = b.push_source(Source::new(1));
let a = b.push(Inc, *s);
let d = b.push(Add, (*s, a));
let mut g = Graph::from_builder(b);

// Check initial values.
assert_eq!(g.view(*s), 1);
assert_eq!(g.view(a), 2);
assert_eq!(g.view(d), 3);

// Update the source value and recompute in parallel.
*g.state_mut(s) = 5;
g.stabilize(&mut pool);

// Check updated values.
assert_eq!(g.view(a), 6);
assert_eq!(g.view(d), 11);

Concepts

Segments

Each unit of scheduling is a Segment: it has typed inputs and outputs, a mutable state, and a compute function.

use flowgraph::typed::*;

pub trait Segment {
    type Inputs: Interface;
    type Outputs: Interface;
    type State: Send + 'static;

    fn init(self) -> Self::State;

    fn compute<'a, 'b: 'a>(
        inputs: <Self::Inputs as Interface>::Values<'a>,
        state: &'b mut Self::State,
        is_first_run: bool,
    ) -> <Self::Outputs as Interface>::Values<'a>;
}

The init method will be called once during graph construction to create the node's state. Immediately after, the compute method will be run once with is_first_run == true to obtain initial/default output values.

On each subsequent graph.stabilize call, the compute method will be run again with new inputs and is_first_run == false.

Interfaces

The associated types Inputs and Outputs define the interface of a segment. They can be constructed from the following building blocks:

  • Port<T> — a single pass-by-value port. It carries (bool, T) where T: Copy + Send + Sync.
  • RefPort<T> — a single pass-by-reference port. It carries (bool, &T) where T: Sync.
  • RefPorts<T> — a dynamic-length group of pass-by-reference ports. It carries (&[bool], &[&T]) where T: Sync, and is compatible with a group of RefPort<T>s.
  • Generalizations of the above: ViewPort<V>, RefViewPort<V> and RefViewPorts<V>, which allow passing custom lifetime-carrying Copy views (like slices or custom array views) to some underlying data.
  • Arbitrarily nested tuples of the above (each branch up to arity 12).

For dynamic-length groups, the length must remain fixed after the first run, so that we have a well-defined static computation graph. Violations will be caught and panic at runtime.

Producing a RefPorts output requires creating a slice of references with lifetime 'a matching the inputs. This allows for simple forwarding of input references, but it also creates difficulty for a node that computes its own values: we have to put the array of references somewhere, but the node state must have static lifetime. To address this difficulty, use a lifetime-erased bump arena (such as bumpalo) in the node state as a scratch buffer storing the reference arrays on each recompute. The arena can be cleared at the beginning of each recompute, so that buffer size is kept bounded.

In Flowgraph, interface values are constrained to Copy because they are required to have trivial Drop implementations. Data ownership is always inside node states and never passed through an interface; only simple scalar values, references and views do. This encourages pass-by-reference for complex data types and simplifies Flowgraph's internal implementation, but may be less ergonomic in some cases.

Notification flags

Each port carries a bool flag alongside the value or reference, indicating whether the value is a notification. At each generation, an unmodified output value can have its flag set to false (no notify), so a downstream node can choose to skip heavy computation.

use flowgraph::typed::*;

struct Abs;

impl Segment for Abs {
    type Inputs = Port<i64>;
    type Outputs = Port<i64>;
    type State = i64; // Remembers the previous output.

    fn init(self) -> i64 { 0 }

    fn compute<'a, 'b: 'a>(
        (x_notify, x): (bool, i64),
        state: &'b mut i64, 
        _is_first_run: bool
    ) -> (bool, i64) {
        if x_notify {
            // Input notified, recompute.
            let output = x.abs();
            let changed = output != *state; // Notify downstream only when |x| actually .
            *state = output;
            (changed, output)
        } else {
            // Input did not notify, simply pass through.
            (false, *state)
        }
    }
}

Treat the flag as a contract that every well-behaved segment upholds: whenever an output flag is false, the value at that port is equal (by Eq) to the value it held in the previous generation. Therefore:

  • A port with notify == true indicates a possible change in its value;
  • A port with notify == false indicates no change in its value.

The flag can be interpreted in an alternative way:

  • A port with notify == true indicates a new event has arrived, with its value being the payload of the event;
  • A port with notify == false indicates no event has arrived, with its value being the payload of the last known event.

The two interpretations are compatible: a new event is a change in the event payload's conceptual identity, and a value change is a new event.

The notification flags also help in scheduling. Each generation, the graph executor sets the flags of modified source nodes, and skips a node completely if none of its upstream source nodes were modified. This makes stabilization after a sparse update touches only a fraction of the graph.

Segment fusion

Multi-threaded execution has a cost: every unit of work is a thread-pool task, and waking a worker to run a trivial node can take much longer time than the actual computation inside the node itself. Therefore, it is desirable to fuse tiny segments together into a single node. (Fusing heavy nodes is equally possible, but fusing prevents parallel execution.)

The library provides combinator methods to fuse segments into larger segments:

Combinator Method Meaning
Id<T> Identity: outputs are inputs unchanged.
Comp(f, g) f.then(g) Composition: outputs g(f(x)).
Left<T, U> / Right<T, U> Projection: outputs the first / second element of a pair.
Fork(f, g) f.fork(g) Fan-out: feed the same input to both, then pair outputs.
Par(f, g) f.par(g) Parallel composition: run f, g on a pair of inputs, then pair outputs. Equivalent to Fork(Comp(Left, f), Comp(Right, g)).
Arr::new(|values, _| …) Applies a stateful closure to the inputs.

However, point-free combinators get unreadable fast. As an alternative, the library also provides the segment! macro, which is a DSL that compiles down to combinators like the Arrow notation in Haskell1:

use flowgraph::typed::*;
use flowgraph::segment;

struct Add;

impl Segment for Add {
    type Inputs = (Port<i64>, Port<i64>);
    type Outputs = Port<i64>;
    type State = ();

    fn init(self) {}

    fn compute<'a, 'b: 'a>(
        ((a_notify, a), (b_notify, b)): ((bool, i64), (bool, i64)),
        _state: &'b mut (),
        _is_first_run: bool
    ) -> (bool, i64) {
        (a_notify || b_notify, a + b)
    }
}

let mut b = GraphBuilder::new();
let s = b.push_source(Source::new(1));
let t = b.push_source(Source::new(2));

// One fused node computing: c = b + a; d = c + a; e = d + a; result (e, d, c).
// Type annotation is needed on inputs and outputs.
let seg = segment!(|a: Port<i64>, b: Port<i64>| -> (Port<i64>, Port<i64>, Port<i64>) {
    let c = (b, a) => Add;
    let d = (c, a) => Add;
    let e = (d, a) => Add;
    (e, d, c)
});

let (e, d, c) = b.push(seg, (*s, *t));

Safety

The library uses unsafe internally, but the typed API should be safe to use. For more details, the following runtime invariants are maintained internally:

  • Single writer per slot. Each state and output slot has exactly one producing node; each input slot is scattered into by exactly that one producer. Concurrent computes write disjoint slots.
  • Per-generation lifetime. Output pointers stay valid as long as inputs and state are unchanged; passthrough's &State forbids reallocation, keeping out-of-cone consumers' pointers live across generations.
  • Read guard. Reading a slot after poking a source but before stabilize panics rather than dereferencing a possibly-stale forwarded pointer.
  • Poison on panic. If a compute panics, downstream slots may hold dangling forwarded pointers, so the graph is poisoned: the pool catches the panic, settles the batch (no hang), re-raises out of stabilize, and every later stabilize or slot read panics. There is no recovery — treat the graph as dead. For recoverable failures, make the failure a value (e.g. a Result<T, E> cell) instead of panicking.

Tests have been run on Miri to check for memory safety and common UBs.

Footnotes

  1. https://www.haskell.org/arrows/syntax.html

About

A multithreaded executor for static computation graphs.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages