pipeline: Directed graph execution model
Our current pipelinelib based jobs require repos to conform to a number
of rigid conventions: assume the repo contains source for only a single
application, build "test" variant, run "test" variant, build
"production" variant, helm deploy/test, publish under a single tag name.
These jobs also assume all of these operations need to be performed
linearly.
While this design was sufficient for our very first use cases, its
convention based design it already proving prohibitively inflexible. For
example, teams maintaining repos that contain multiple interrelated
applications cannot build and test these applications as independent
images; Teams wanting to execute multiple test suites would have to wrap
them in a single entrypoint and implement their own concurrency should
they need it; Etc.
Instead of Release Engineering maintaining a new specialized pipeline
job for each team that performs only slightly different permutations of
the same operations (resulting in duplication of job definitions and a
large maintenance burden), we can instead establish a configuration
format and interface by which teams provide their own pipeline
compositions.
This initial commit in a series of pipeline related commits implements
two fundamental components to support a CI/CD pipeline that can execute
any number of user-defined variant build/test/publish/deploy stages and
steps in a safely concurrent model: a directed-graph based execution
model, and name bindings for stage outputs. The former provides the
model for composing stage execution, and the latter provides a decoupled
system for defining what outputs each subsequent stage operates upon.
First, an `ExecutionGraph` class that can represent a directed acyclic
graph given a number of linearly defined arcs (aka branches/edges). This
component will allow users to provide the overall execution flow as
separate linear processes but allow parallel branches of the execution
graph to be scheduled concurrently.
Example:
/* To represent a graph with separate parallel branches like:
*
* a x
* ⇘ ⇙
* b
* ⇙ ⇘
* y c
* ⇘ ⇙
* z
*
* One only needs to provides each linear execution arc
*/
def graph = new ExecutionGraph([["a", "b", "c", "z"], ["x", "b", "y", "z"]])
/* The ExecutionGraph can solve how those arcs intersect and how the
* nodes can be scheduled with a degree of concurrency that Jenkins
* allows.
*/
graph.stack() // => [["a", "x"], ["b"], ["y", "c"], ["z"]]
Second, a set of context classes for managing immutable global and local
name/value bindings between nodes in the graph. Effectively this will
provide a way for pipeline stages to safely and deterministically
consume inputs from previous stages along the same branch, and to
provide their own outputs for subsequent stages to consume.
For example, one stage called "build" that builds a container image will
save the image ID in a predetermined local binding called `.imageID` and
a subsequent "publish" stage configured by the user can reference that
image by `${build.imageID}`.
Once a value is bound to a name, that name cannot be reused; bindings
are immutable. Node contexts are only allowed to access namespaces for
nodes that precede them in same branch of the graph, ensuring
deterministic behavior during parallel graph branch execution. See unit
tests for `ExecutionContext` for details on expected behavior.
Put together, these two data structures can constitute an execution
"stack" of sorts that can be safely mapped to Jenkins Pipeline stages,
and make use of parallel execution for graph branches. Specifically, the
`ExecutionGraph.stack()` method is implemented to yield each set of
independent stack "frames" in topological sort order which can safely be
scheduled to run in parallel.
Bug: T210267
Change-Id: Ic5d01bf54c703eaf14434a36f1e2b3e276b48b6f
5 years ago |
|
- package org.wikimedia.integration
-
- import com.cloudbees.groovy.cps.NonCPS
-
- /**
- * Represents a directed acyclic graph (DAG) for defining pipeline stage
- * dependencies and scheduling them in a parallel topological-sort order.
- *
- * An {@link ExecutionGraph} is constructed by passing sets of arcs (aka
- * edges/branches) that may or may not intersect on common nodes.
- *
- * @example
- * A graph such as:
- * <pre><code>
- * a w z
- * ⇘ ⇙
- * b x
- * ⇙ ⇘ ⇙
- * c y
- * ⇘ ⇙
- * d
- * </code></pre>
- *
- * Can be constructed any number of ways as long as all the arcs are
- * represented in the given sets.
- *
- * <pre><code>
- * def graph = ExecutionGraph([
- * ["a", "b", "c", "d"], // defines edges a → b, b → c, c → d
- * ["w", "b", "y"], // defines edges w → b, b → y
- * ["x", "y", "d"], // defines edges x → y, y → d
- * ["z"], // defines no edge but an isolate node
- * ])
- * </code></pre>
- *
- * @example
- * The same graph could be constructed this way.
- *
- * <pre><code>
- * def graph = ExecutionGraph([
- * ["a", "b", "y"],
- * ["w", "b", "c", "d"],
- * ["x", "y"],
- * ["z"],
- * ])
- * </code></pre>
- *
- * @example
- * {@link ExecutionGraph#stack()} will return concurrent "frames" of the graph
- * in a topological sort order, meaning that nodes are always traversed before
- * any of their successor nodes, and nodes of independent branches can be
- * scheduled in parallel.
- *
- * For the same example graph:
- *
- * <pre><code>
- * graph.stack().each { println it.join("|") }
- * </code></pre>
- *
- * Would output:
- *
- * <pre>
- * a|w|z
- * b|x
- * c|y
- * d
- * </pre>
- */
- class ExecutionGraph implements Serializable {
- /**
- * Map of graph progression, nodes and their successor (out) nodes.
- *
- * @example
- * An example graph and its <code>progression</code>.
- *
- * <pre><code>
- * a w z [
- * ⇘ ⇙ a:[b], w:[b],
- * b x
- * ⇙ ⇘ ⇙ b:[c, y], x:[y],
- * c y
- * ⇘ ⇙ c:[d], y:[d],
- * d ]
- * </code></pre>
- */
- protected Map progression
-
- /**
- * Map of graph recession, nodes and their predecessor (in) nodes. Allows
- * for efficient backward traversal.
- *
- * @example
- * An example graph and its <code>recession</code>.
- *
- * <pre><code>
- * a w z [
- * ⇘ ⇙ b:[a, w],
- * b x
- * ⇙ ⇘ ⇙ c:[b], y:[b, x],
- * c y
- * ⇘ ⇙ d:[c, y],
- * d ]
- * </code></pre>
- */
- protected Map recession
-
- /**
- * Set of graph isolates, nodes that are unconnected from all other nodes.
- */
- protected Set isolates
-
- /**
- * Constructs a directed execution graph using the given sets of edge
- * sequences (arcs).
- *
- * @example
- * See {@link ExecutionGraph} for examples.
- */
- ExecutionGraph(List arcs) {
- progression = [:]
- recession = [:]
- isolates = [] as Set
-
- arcs.each { addArc(it as List) }
- }
-
- /**
- * All ancestors of (nodes eventually leading to) the given node.
- */
- Set ancestorsOf(node) {
- def parents = inTo(node)
-
- parents + parents.inject([] as Set) { ancestors, parent -> ancestors + ancestorsOf(parent) }
- }
-
- /**
- * Whether the given graph is equal to this one.
- */
- boolean equals(ExecutionGraph other) {
- progression == other.progression && isolates == other.isolates
- }
-
- /**
- * Returns all nodes that have no outgoing edges.
- */
- Set leaves() {
- (recession.keySet() - progression.keySet()) + isolates
- }
-
- /**
- * The number of nodes that lead directly to the given one.
- */
- int inDegreeOf(node) {
- inTo(node).size()
- }
-
- /**
- * The nodes that lead directly to the given one.
- */
- Set inTo(node) {
- recession[node] ?: [] as Set
- }
-
- /**
- * All nodes in the graph.
- */
- Set nodes() {
- progression.keySet() + recession.keySet() + isolates
- }
-
- /**
- * Returns a union of this graph and the given one.
- */
- ExecutionGraph or(ExecutionGraph other) {
- def newGraph = new ExecutionGraph()
-
- [this, other].each { source ->
- source.progression.each { newGraph.addSuccession(it.key, it.value) }
- source.isolates.each { newGraph.addIsolate(it) }
- }
-
- newGraph
- }
-
- /**
- * The number of nodes the given one directly leads to.
- */
- int outDegreeOf(node) {
- outOf(node).size()
- }
-
- /**
- * The nodes the given one directly leads to.
- */
- Set outOf(node) {
- progression[node] ?: [] as Set
- }
-
- /**
- * Returns a concatenation of this graph and the given one.
- */
- ExecutionGraph plus(ExecutionGraph other) {
- def newGraph = this | other
-
- leaves().each { leaf ->
- newGraph.addSuccession(leaf, other.roots())
- }
-
- newGraph
- }
-
- /**
- * Returns all nodes that have no incoming edges.
- */
- Set roots() {
- (progression.keySet() - recession.keySet()) + isolates
- }
-
- /**
- * Returns each concurrent node "frames" of the graph in a topological sort
- * order. See {@link ExecutionGraph} for examples. A {@link RuntimeException}
- * will be thrown in the event a graph cycle is detected.
- */
- List stack() throws RuntimeException {
- def concurrentFrames = []
-
- def graphSize = (progression.keySet() + recession.keySet() + isolates).size()
- def traversed = [] as Set
- def prevNodes
-
- while (traversed.size() < graphSize) {
- def nextNodes
-
- if (!prevNodes) {
- nextNodes = roots()
- } else {
- nextNodes = [] as Set
-
- prevNodes.each { prev ->
- outOf(prev).each { outNode ->
- if ((inTo(outNode) - traversed).isEmpty()) {
- nextNodes.add(outNode)
- }
- }
- }
- }
-
- if (!nextNodes && traversed.size() < graphSize) {
- throw new RuntimeException("cycle detected in graph (${this})")
- }
-
- traversed.addAll(nextNodes)
- prevNodes = nextNodes
- concurrentFrames.add(nextNodes as List)
- }
-
- concurrentFrames
- }
-
- /**
- * A string representation of the graph compatible with <code>dot</code>.
- *
- * @example
- * Render the graph with dot
- *
- * <pre><code>
- * $ echo "[graph.toString() value]" | dot -Tsvg > graph.svg
- * </code></pre>
- */
- String toString() {
- def allEdges = progression.inject([]) { edges, predecessor, successors ->
- edges + successors.collect { successor ->
- "${predecessor} -> ${successor}"
- }
- }
-
- 'digraph { ' + (allEdges + isolates).join("; ") + ' }'
- }
-
- protected
-
- /**
- * Appends a new arc of nodes to the graph.
- *
- * @example
- * An existing graph.
- * <pre><code>
- * a
- * ⇘
- * b
- * ⇘
- * c
- * </code></pre>
- *
- * Appended with <code>graph << ["x", "b", "y", "z"]</code> becomes.
- * <pre><code>
- * a x
- * ⇘ ⇙
- * b
- * ⇙ ⇘
- * y c
- * ⇘
- * z
- * </code></pre>
- */
- @NonCPS
- void addArc(List arc) {
- if (arc.size() == 1) {
- addIsolate(arc[0])
- } else {
- arc.eachWithIndex { node, i ->
- if (i < (arc.size() - 1)) {
- addSuccession(node, [arc[i+1]])
- }
- }
- }
- }
-
- @NonCPS
- void addIsolate(isolate) {
- isolates.add(isolate)
- }
-
- @NonCPS
- void addSuccession(predecessor, successors) {
- if (!progression.containsKey(predecessor)) {
- progression[predecessor] = [] as Set
- }
-
- progression[predecessor].addAll(successors)
-
- successors.each { successor ->
- if (!recession.containsKey(successor)) {
- recession[successor] = [] as Set
- }
-
- recession[successor].add(predecessor)
- }
-
- isolates -= (successors + predecessor)
- }
- }
|