You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

342 lines
7.7 KiB

pipeline: Directed graph execution model Our current pipelinelib based jobs require repos to conform to a number of rigid conventions: assume the repo contains source for only a single application, build "test" variant, run "test" variant, build "production" variant, helm deploy/test, publish under a single tag name. These jobs also assume all of these operations need to be performed linearly. While this design was sufficient for our very first use cases, its convention based design it already proving prohibitively inflexible. For example, teams maintaining repos that contain multiple interrelated applications cannot build and test these applications as independent images; Teams wanting to execute multiple test suites would have to wrap them in a single entrypoint and implement their own concurrency should they need it; Etc. Instead of Release Engineering maintaining a new specialized pipeline job for each team that performs only slightly different permutations of the same operations (resulting in duplication of job definitions and a large maintenance burden), we can instead establish a configuration format and interface by which teams provide their own pipeline compositions. This initial commit in a series of pipeline related commits implements two fundamental components to support a CI/CD pipeline that can execute any number of user-defined variant build/test/publish/deploy stages and steps in a safely concurrent model: a directed-graph based execution model, and name bindings for stage outputs. The former provides the model for composing stage execution, and the latter provides a decoupled system for defining what outputs each subsequent stage operates upon. First, an `ExecutionGraph` class that can represent a directed acyclic graph given a number of linearly defined arcs (aka branches/edges). This component will allow users to provide the overall execution flow as separate linear processes but allow parallel branches of the execution graph to be scheduled concurrently. Example: /* To represent a graph with separate parallel branches like: * * a x * ⇘ ⇙ * b * ⇙ ⇘ * y c * ⇘ ⇙ * z * * One only needs to provides each linear execution arc */ def graph = new ExecutionGraph([["a", "b", "c", "z"], ["x", "b", "y", "z"]]) /* The ExecutionGraph can solve how those arcs intersect and how the * nodes can be scheduled with a degree of concurrency that Jenkins * allows. */ graph.stack() // => [["a", "x"], ["b"], ["y", "c"], ["z"]] Second, a set of context classes for managing immutable global and local name/value bindings between nodes in the graph. Effectively this will provide a way for pipeline stages to safely and deterministically consume inputs from previous stages along the same branch, and to provide their own outputs for subsequent stages to consume. For example, one stage called "build" that builds a container image will save the image ID in a predetermined local binding called `.imageID` and a subsequent "publish" stage configured by the user can reference that image by `${build.imageID}`. Once a value is bound to a name, that name cannot be reused; bindings are immutable. Node contexts are only allowed to access namespaces for nodes that precede them in same branch of the graph, ensuring deterministic behavior during parallel graph branch execution. See unit tests for `ExecutionContext` for details on expected behavior. Put together, these two data structures can constitute an execution "stack" of sorts that can be safely mapped to Jenkins Pipeline stages, and make use of parallel execution for graph branches. Specifically, the `ExecutionGraph.stack()` method is implemented to yield each set of independent stack "frames" in topological sort order which can safely be scheduled to run in parallel. Bug: T210267 Change-Id: Ic5d01bf54c703eaf14434a36f1e2b3e276b48b6f
5 years ago
  1. package org.wikimedia.integration
  2. import com.cloudbees.groovy.cps.NonCPS
  3. /**
  4. * Represents a directed acyclic graph (DAG) for defining pipeline stage
  5. * dependencies and scheduling them in a parallel topological-sort order.
  6. *
  7. * An {@link ExecutionGraph} is constructed by passing sets of arcs (aka
  8. * edges/branches) that may or may not intersect on common nodes.
  9. *
  10. * @example
  11. * A graph such as:
  12. * <pre><code>
  13. * a w z
  14. *
  15. * b x
  16. *
  17. * c y
  18. *
  19. * d
  20. * </code></pre>
  21. *
  22. * Can be constructed any number of ways as long as all the arcs are
  23. * represented in the given sets.
  24. *
  25. * <pre><code>
  26. * def graph = ExecutionGraph([
  27. * ["a", "b", "c", "d"], // defines edges a → b, b → c, c → d
  28. * ["w", "b", "y"], // defines edges w → b, b → y
  29. * ["x", "y", "d"], // defines edges x → y, y → d
  30. * ["z"], // defines no edge but an isolate node
  31. * ])
  32. * </code></pre>
  33. *
  34. * @example
  35. * The same graph could be constructed this way.
  36. *
  37. * <pre><code>
  38. * def graph = ExecutionGraph([
  39. * ["a", "b", "y"],
  40. * ["w", "b", "c", "d"],
  41. * ["x", "y"],
  42. * ["z"],
  43. * ])
  44. * </code></pre>
  45. *
  46. * @example
  47. * {@link ExecutionGraph#stack()} will return concurrent "frames" of the graph
  48. * in a topological sort order, meaning that nodes are always traversed before
  49. * any of their successor nodes, and nodes of independent branches can be
  50. * scheduled in parallel.
  51. *
  52. * For the same example graph:
  53. *
  54. * <pre><code>
  55. * graph.stack().each { println it.join("|") }
  56. * </code></pre>
  57. *
  58. * Would output:
  59. *
  60. * <pre>
  61. * a|w|z
  62. * b|x
  63. * c|y
  64. * d
  65. * </pre>
  66. */
  67. class ExecutionGraph implements Serializable {
  68. /**
  69. * Map of graph progression, nodes and their successor (out) nodes.
  70. *
  71. * @example
  72. * An example graph and its <code>progression</code>.
  73. *
  74. * <pre><code>
  75. * a w z [
  76. * a:[b], w:[b],
  77. * b x
  78. * b:[c, y], x:[y],
  79. * c y
  80. * c:[d], y:[d],
  81. * d ]
  82. * </code></pre>
  83. */
  84. protected Map progression
  85. /**
  86. * Map of graph recession, nodes and their predecessor (in) nodes. Allows
  87. * for efficient backward traversal.
  88. *
  89. * @example
  90. * An example graph and its <code>recession</code>.
  91. *
  92. * <pre><code>
  93. * a w z [
  94. * b:[a, w],
  95. * b x
  96. * c:[b], y:[b, x],
  97. * c y
  98. * d:[c, y],
  99. * d ]
  100. * </code></pre>
  101. */
  102. protected Map recession
  103. /**
  104. * Set of graph isolates, nodes that are unconnected from all other nodes.
  105. */
  106. protected Set isolates
  107. /**
  108. * Constructs a directed execution graph using the given sets of edge
  109. * sequences (arcs).
  110. *
  111. * @example
  112. * See {@link ExecutionGraph} for examples.
  113. */
  114. ExecutionGraph(List arcs) {
  115. progression = [:]
  116. recession = [:]
  117. isolates = [] as Set
  118. arcs.each { addArc(it as List) }
  119. }
  120. /**
  121. * All ancestors of (nodes eventually leading to) the given node.
  122. */
  123. Set ancestorsOf(node) {
  124. def parents = inTo(node)
  125. parents + parents.inject([] as Set) { ancestors, parent -> ancestors + ancestorsOf(parent) }
  126. }
  127. /**
  128. * Whether the given graph is equal to this one.
  129. */
  130. boolean equals(ExecutionGraph other) {
  131. progression == other.progression && isolates == other.isolates
  132. }
  133. /**
  134. * Returns all nodes that have no outgoing edges.
  135. */
  136. Set leaves() {
  137. (recession.keySet() - progression.keySet()) + isolates
  138. }
  139. /**
  140. * The number of nodes that lead directly to the given one.
  141. */
  142. int inDegreeOf(node) {
  143. inTo(node).size()
  144. }
  145. /**
  146. * The nodes that lead directly to the given one.
  147. */
  148. Set inTo(node) {
  149. recession[node] ?: [] as Set
  150. }
  151. /**
  152. * All nodes in the graph.
  153. */
  154. Set nodes() {
  155. progression.keySet() + recession.keySet() + isolates
  156. }
  157. /**
  158. * Returns a union of this graph and the given one.
  159. */
  160. ExecutionGraph or(ExecutionGraph other) {
  161. def newGraph = new ExecutionGraph()
  162. [this, other].each { source ->
  163. source.progression.each { newGraph.addSuccession(it.key, it.value) }
  164. source.isolates.each { newGraph.addIsolate(it) }
  165. }
  166. newGraph
  167. }
  168. /**
  169. * The number of nodes the given one directly leads to.
  170. */
  171. int outDegreeOf(node) {
  172. outOf(node).size()
  173. }
  174. /**
  175. * The nodes the given one directly leads to.
  176. */
  177. Set outOf(node) {
  178. progression[node] ?: [] as Set
  179. }
  180. /**
  181. * Returns a concatenation of this graph and the given one.
  182. */
  183. ExecutionGraph plus(ExecutionGraph other) {
  184. def newGraph = this | other
  185. leaves().each { leaf ->
  186. newGraph.addSuccession(leaf, other.roots())
  187. }
  188. newGraph
  189. }
  190. /**
  191. * Returns all nodes that have no incoming edges.
  192. */
  193. Set roots() {
  194. (progression.keySet() - recession.keySet()) + isolates
  195. }
  196. /**
  197. * Returns each concurrent node "frames" of the graph in a topological sort
  198. * order. See {@link ExecutionGraph} for examples. A {@link RuntimeException}
  199. * will be thrown in the event a graph cycle is detected.
  200. */
  201. List stack() throws RuntimeException {
  202. def concurrentFrames = []
  203. def graphSize = (progression.keySet() + recession.keySet() + isolates).size()
  204. def traversed = [] as Set
  205. def prevNodes
  206. while (traversed.size() < graphSize) {
  207. def nextNodes
  208. if (!prevNodes) {
  209. nextNodes = roots()
  210. } else {
  211. nextNodes = [] as Set
  212. prevNodes.each { prev ->
  213. outOf(prev).each { outNode ->
  214. if ((inTo(outNode) - traversed).isEmpty()) {
  215. nextNodes.add(outNode)
  216. }
  217. }
  218. }
  219. }
  220. if (!nextNodes && traversed.size() < graphSize) {
  221. throw new RuntimeException("cycle detected in graph (${this})")
  222. }
  223. traversed.addAll(nextNodes)
  224. prevNodes = nextNodes
  225. concurrentFrames.add(nextNodes as List)
  226. }
  227. concurrentFrames
  228. }
  229. /**
  230. * A string representation of the graph compatible with <code>dot</code>.
  231. *
  232. * @example
  233. * Render the graph with dot
  234. *
  235. * <pre><code>
  236. * $ echo "[graph.toString() value]" | dot -Tsvg &gt; graph.svg
  237. * </code></pre>
  238. */
  239. String toString() {
  240. def allEdges = progression.inject([]) { edges, predecessor, successors ->
  241. edges + successors.collect { successor ->
  242. "${predecessor} -> ${successor}"
  243. }
  244. }
  245. 'digraph { ' + (allEdges + isolates).join("; ") + ' }'
  246. }
  247. protected
  248. /**
  249. * Appends a new arc of nodes to the graph.
  250. *
  251. * @example
  252. * An existing graph.
  253. * <pre><code>
  254. * a
  255. *
  256. * b
  257. *
  258. * c
  259. * </code></pre>
  260. *
  261. * Appended with <code>graph &lt;&lt; ["x", "b", "y", "z"]</code> becomes.
  262. * <pre><code>
  263. * a x
  264. *
  265. * b
  266. *
  267. * y c
  268. *
  269. * z
  270. * </code></pre>
  271. */
  272. @NonCPS
  273. void addArc(List arc) {
  274. if (arc.size() == 1) {
  275. addIsolate(arc[0])
  276. } else {
  277. arc.eachWithIndex { node, i ->
  278. if (i < (arc.size() - 1)) {
  279. addSuccession(node, [arc[i+1]])
  280. }
  281. }
  282. }
  283. }
  284. @NonCPS
  285. void addIsolate(isolate) {
  286. isolates.add(isolate)
  287. }
  288. @NonCPS
  289. void addSuccession(predecessor, successors) {
  290. if (!progression.containsKey(predecessor)) {
  291. progression[predecessor] = [] as Set
  292. }
  293. progression[predecessor].addAll(successors)
  294. successors.each { successor ->
  295. if (!recession.containsKey(successor)) {
  296. recession[successor] = [] as Set
  297. }
  298. recession[successor].add(predecessor)
  299. }
  300. isolates -= (successors + predecessor)
  301. }
  302. }