# outside interface # applies a new operation to the end of the graph function push_operation!(graph::DAG, operation::Operation) # 1.: Add the operation to the DAG push!(graph.operationsToApply, operation) # 2.: Apply all operations in the chain apply_all!(graph) # 3.: Regenerate properties, possible operations from here end # reverts the latest applied operation, essentially like a ctrl+z for function pop_operation!(graph::DAG) # 1.: Remove the operation from the appliedChain of the DAG if !isempty(graph.operationsToApply) pop!(graph.operationsToApply) elseif !isempty(graph.appliedOperations) appliedOp = pop!(graph.appliedOperations) revert_operation!(graph, appliedOp) else error("No more operations to pop!") end # 2.: Apply all (remaining) operations in the chain apply_all!(graph) # 3.: Regenerate properties, possible operations from here end can_pop(graph::DAG) = !isempty(graph.operationsToApply) || !isempty(graph.appliedOperations) # reset the graph to its initial state with no operations applied function reset_graph!(graph::DAG) while (can_pop(graph)) pop_operation!(graph) end end # implementation detail functions, don't export # applies all unapplied operations in the DAG function apply_all!(graph::DAG) while !isempty(graph.operationsToApply) # get next operation to apply from front of the deque op = popfirst!(graph.operationsToApply) # apply it appliedOp = apply_operation!(graph, op) # push to the end of the appliedOperations deque push!(graph.appliedOperations, appliedOp) end end function apply_operation!(graph::DAG, operation::Operation) error("Unknown operation type!") end function apply_operation!(graph::DAG, operation::NodeFusion) diff = node_fusion!(graph, operation.input[1], operation.input[2], operation.input[3]) return AppliedNodeFusion(operation, diff) end function apply_operation!(graph::DAG, operation::NodeReduction) diff = node_reduction!(graph, operation.input[1], operation.input[2]) return AppliedNodeReduction(operation, diff) end function apply_operation!(graph::DAG, operation::NodeSplit) diff = node_split!(graph, operation.input) return AppliedNodeSplit(operation, diff) end function revert_operation!(graph::DAG, operation::AppliedOperation) error("Unknown operation type!") end function revert_operation!(graph::DAG, operation::AppliedNodeFusion) revert_diff!(graph, operation.diff) return operation.operation end function revert_operation!(graph::DAG, operation::AppliedNodeReduction) revert_diff!(graph, operation.diff) return operation.operation end function revert_operation!(graph::DAG, operation::AppliedNodeSplit) revert_diff!(graph, operation.diff) return operation.operation end function revert_diff!(graph::DAG, diff) # add removed nodes, remove added nodes, same for edges # note the order for edge in diff.addedEdges remove_edge!(graph, edge, false) end for node in diff.addedNodes remove_node!(graph, node, false) end for node in diff.removedNodes insert_node!(graph, node, false) end for edge in diff.removedEdges insert_edge!(graph, edge, false) end end # Fuse nodes n1 -> n2 -> n3 together into one node, return the applied difference to the graph function node_fusion!(graph::DAG, n1::ComputeTaskNode, n2::DataTaskNode, n3::ComputeTaskNode) # clear snapshot get_snapshot_diff(graph) if !(n1 in graph) || !(n2 in graph) || !(n3 in graph) error("[Node Fusion] The given nodes are not part of the given graph") end if !is_child(n1, n2) || !is_child(n2, n3) || !is_parent(n3, n2) || !is_parent(n2, n1) # the checks are redundant but maybe a good sanity check error("[Node Fusion] The given nodes are not connected by edges which is required for node fusion") end # save children and parents n1_children = children(n1) n2_parents = parents(n2) n3_parents = parents(n3) if length(n2_parents) > 1 error("[Node Fusion] The given data node has more than one parent") end required_edge1 = make_edge(n1, n2) required_edge2 = make_edge(n2, n3) # remove the edges and nodes that will be replaced by the fused node remove_edge!(graph, required_edge1) remove_edge!(graph, required_edge2) remove_node!(graph, n1) remove_node!(graph, n2) # get n3's children now so it automatically excludes n2 n3_children = children(n3) remove_node!(graph, n3) # create new node with the fused compute task new_node = ComputeTaskNode(FusedComputeTask{typeof(n1.task), typeof(n3.task)}()) insert_node!(graph, new_node) # "repoint" children of n1 to the new node for child in n1_children remove_edge!(graph, make_edge(child, n1)) insert_edge!(graph, make_edge(child, new_node)) end # "repoint" children of n3 to the new node for child in n3_children remove_edge!(graph, make_edge(child, n3)) insert_edge!(graph, make_edge(child, new_node)) end # "repoint" parents of n3 from new node for parent in n3_parents remove_edge!(graph, make_edge(n3, parent)) insert_edge!(graph, make_edge(new_node, parent)) end return get_snapshot_diff(graph) end function node_reduction!(graph::DAG, n1::Node, n2::Node) # clear snapshot get_snapshot_diff(graph) if !(n1 in graph) || !(n2 in graph) error("[Node Reduction] The given nodes are not part of the given graph") end if typeof(n1) != typeof(n2) error("[Node Reduction] The given nodes are not of the same type") end # save n2 parents and children n2_children = children(n2) n2_parents = parents(n2) if n2_children != children(n1) error("[Node Reduction] The given nodes do not have equal prerequisite nodes which is required for node reduction") end # remove n2 and all its parents and children for child in n2_children remove_edge!(graph, make_edge(child, n2)) end for parent in n2_parents remove_edge!(graph, make_edge(n2, parent)) # add parents of n2 to n1 insert_edge!(graph, make_edge(n1, parent)) end remove_node!(graph, n2) return get_snapshot_diff(graph) end function node_split!(graph::DAG, n1::Node) # clear snapshot get_snapshot_diff(graph) if !(n1 in graph) error("[Node Split] The given node is not part of the given graph") end n1_parents = parents(n1) n1_children = children(n1) if length(n1_parents) <= 1 error("[Node Split] The given node does not have multiple parents which is required for node split") end for parent in n1_parents n_copy = copy(n1) insert_node!(graph, n_copy) insert_edge!(graph, make_edge(n_copy, parent)) remove_edge!(graph, make_edge(n1, parent)) for child in n1_children insert_edge!(graph, make_edge(child, n_copy)) end end return get_snapshot_diff(graph) end # function to generate all possible optmizations on the graph function generate_options(graph::DAG) options = PossibleOperations() # make sure the graph is fully generated through apply_all!(graph) # find possible node fusions for node in graph.nodes if (typeof(node) <: DataTaskNode) node_parents = parents(node) if length(node_parents) != 1 # data node can only have a single parent continue end parent_node = pop!(node_parents) node_children = children(node) if length(node_children) != 1 # this node is an entry node or has multiple children which should not be possible continue end child_node = pop!(node_children) push!(options.nodeFusions, NodeFusion((child_node, node, parent_node))) end end # find possible node reductions visitedNodes = Set{Node}() for node in graph.nodes if (node in visitedNodes) continue end push!(visitedNodes, node) reductionVector = missing # possible reductions are with nodes that are partners, i.e. parents of children for partner in partners(node) if can_reduce(node, partner) if reductionVector == missing # only when there's at least one reduction partner, insert the vector reductionVector = Vector{Node} push!(reductionVector, node) push!(options.nodeReductions, NodeReduction(reductionVector)) end push!(reductionVector, partner) push!(visitedNodes, partner) end end end # find possible node splits for node in graph.nodes if (can_split(node)) push!(options.nodeSplits, NodeSplit(node)) end end return options end