• Public
  • Public/Protected
  • All



Graph-driven data processor. qiwi.github.io/protopipe

stability-experimental buildStatus npm (tag) dependencyStatus devDependencyStatus Maintainability Test Coverage CodeStyle


We often come across the problem of atomic data processing (logwrap, uniconfig, cyclone, etc), and it seems to be useful to make the one pipeline to rule them all. Not universal, not high-performance. But dumb and clear.



  yarn add protopipe


import {Graph, IAny, ISpace, NetProcessor} from 'protopipe'

const graph = new Graph({
  edges: ['AB', 'BC'],
  vertexes: ['A', 'B', 'C'],
  incidentor: {
    type: 'EDGE_LIST_INCDR',
    value: {
      'AB': ['A', 'B'],
      'BC': ['B', 'C'],
const handler = {
  // Default handler
  graph: (space: ISpace): IAny => (NetProcessor.getData(space) || {value: 0}).value * 2,
  // Vertex specific handlers
  vertexes: {
    'B': (space: ISpace): IAny => (NetProcessor.getData(space, 'A') || {value: 10}).value * 3,
const protopipe = new NetProcessor({
const space = protopipe.impact(true, ['A', 1]) as ISpace

console.log(NetProcessor.getData(space, 'C').value) // 6
  • Sync / async execution modes.
  • Stateful and stateless contracts.
  • Deep customization.
  • Typings for both worlds — TS and flow.
Limitations (of default executor, traverser, etc)
  • Protopipe supports digraphs only.
  • The lib does not solve the declaration problem (you know, adjacency/incidence matrix, adjacency list, DSL).
  • No consistency checks out of box: graph is being processed as is. But you're able to add custom assertions (for example, BFS-based check).

Definitions and contracts

  • Space is a set with some added structure.
  • Vertex is a graph atom.
  • Edge — bond.
  • Incidentor — the rule of connecting vertexes and edges.
  • Graph — a class that implements IGraph — stores vertexes and edges collections, features and incidentor.
  • Sequence — any possible transition.
    • Walk: vertices may repeat, edges may repeat (closed / open)
    • Trail: vertices may repeat, edges cannot repeat (open)
    • Circuit: vertices may repeat, edges cannot repeat (closed)
    • Path: vertices cannot repeat, edges cannot repeat (open)
    • Cycle : vertices cannot repeat, edges cannot repeat (closed)
  • Pipe is an executable segment of pipeline, any directed sequence with attached handler(s)
  • Handler — lambda-function, which implements IHandler iface.


export type IGraph = {
  vertexes: Array<IVertex>,
  edges: Array<IEdge>,
  incidentor: IGraphIncidentor

export type IGraphRepresentation = any

export type IGraphIncidentor = {
  type: IGraphIncidentorType,
  value: IGraphRepresentation

Sync / async

Pass mode flag as the first .impact() argument to get result or promise.

const graph = new Graph({
  edges: ['AB', 'AC', 'BC', 'BD', 'CD', 'AD'],
  vertexes: ['A', 'B', 'C', 'D'],
  incidentor: {
    type: 'EDGE_LIST_INCDR',
    value: {
      'AB': ['A', 'B'],
      'AC': ['A', 'C'],
      'BC': ['B', 'C'],
      'BD': ['B', 'D'],
      'CD': ['C', 'D'],
      'AD': ['A', 'D'],
const handler = (space: ISpace) => NetProcessor.requireElt('ANCHOR', space).value.vertex
const netProcessor = new NetProcessor({graph, handler})

const res1 = netProcessor.impact(true,'A') as ISpace 
NetProcessor.getData(res1, 'D') // 'D'

netProcessor.impact(false,'A').then((res) => {
  NetProcessor.getData(res, 'D') // 'D'


The lib exposes its inners as ES5, ES6 and TS formats.


You're able to override everything.