334 lines
9.3 KiB
Julia
334 lines
9.3 KiB
Julia
# outside interface
|
|
|
|
# applies a new operation to the end of the graph
|
|
function push_operation!(graph::DAG, operation::Operation)
|
|
# 1.: Add the operation to the DAG
|
|
push!(graph.operationsToApply, operation)
|
|
|
|
# 2.: Apply all operations in the chain
|
|
apply_all!(graph)
|
|
|
|
# 3.: Regenerate properties, possible operations from here
|
|
|
|
graph.possibleOperations.dirty = true
|
|
end
|
|
|
|
# reverts the latest applied operation, essentially like a ctrl+z for
|
|
function pop_operation!(graph::DAG)
|
|
# 1.: Remove the operation from the appliedChain of the DAG
|
|
if !isempty(graph.operationsToApply)
|
|
pop!(graph.operationsToApply)
|
|
elseif !isempty(graph.appliedOperations)
|
|
appliedOp = pop!(graph.appliedOperations)
|
|
revert_operation!(graph, appliedOp)
|
|
else
|
|
error("No more operations to pop!")
|
|
end
|
|
|
|
# 2.: Apply all (remaining) operations in the chain
|
|
apply_all!(graph)
|
|
|
|
# 3.: Regenerate properties, possible operations from here
|
|
|
|
graph.possibleOperations.dirty = true
|
|
end
|
|
|
|
can_pop(graph::DAG) = !isempty(graph.operationsToApply) || !isempty(graph.appliedOperations)
|
|
|
|
# reset the graph to its initial state with no operations applied
|
|
function reset_graph!(graph::DAG)
|
|
while (can_pop(graph))
|
|
pop_operation!(graph)
|
|
end
|
|
end
|
|
|
|
# implementation detail functions, don't export
|
|
|
|
# applies all unapplied operations in the DAG
|
|
function apply_all!(graph::DAG)
|
|
while !isempty(graph.operationsToApply)
|
|
# get next operation to apply from front of the deque
|
|
op = popfirst!(graph.operationsToApply)
|
|
|
|
# apply it
|
|
appliedOp = apply_operation!(graph, op)
|
|
|
|
# push to the end of the appliedOperations deque
|
|
push!(graph.appliedOperations, appliedOp)
|
|
end
|
|
end
|
|
|
|
|
|
function apply_operation!(graph::DAG, operation::Operation)
|
|
error("Unknown operation type!")
|
|
end
|
|
|
|
function apply_operation!(graph::DAG, operation::NodeFusion)
|
|
diff = node_fusion!(graph, operation.input[1], operation.input[2], operation.input[3])
|
|
return AppliedNodeFusion(operation, diff)
|
|
end
|
|
|
|
function apply_operation!(graph::DAG, operation::NodeReduction)
|
|
diff = node_reduction!(graph, operation.input[1], operation.input[2])
|
|
return AppliedNodeReduction(operation, diff)
|
|
end
|
|
|
|
function apply_operation!(graph::DAG, operation::NodeSplit)
|
|
diff = node_split!(graph, operation.input)
|
|
return AppliedNodeSplit(operation, diff)
|
|
end
|
|
|
|
|
|
function revert_operation!(graph::DAG, operation::AppliedOperation)
|
|
error("Unknown operation type!")
|
|
end
|
|
|
|
function revert_operation!(graph::DAG, operation::AppliedNodeFusion)
|
|
revert_diff!(graph, operation.diff)
|
|
return operation.operation
|
|
end
|
|
|
|
function revert_operation!(graph::DAG, operation::AppliedNodeReduction)
|
|
revert_diff!(graph, operation.diff)
|
|
return operation.operation
|
|
end
|
|
|
|
function revert_operation!(graph::DAG, operation::AppliedNodeSplit)
|
|
revert_diff!(graph, operation.diff)
|
|
return operation.operation
|
|
end
|
|
|
|
|
|
function revert_diff!(graph::DAG, diff)
|
|
# add removed nodes, remove added nodes, same for edges
|
|
# note the order
|
|
for edge in diff.addedEdges
|
|
remove_edge!(graph, edge, false)
|
|
end
|
|
for node in diff.addedNodes
|
|
remove_node!(graph, node, false)
|
|
end
|
|
|
|
for node in diff.removedNodes
|
|
insert_node!(graph, node, false)
|
|
end
|
|
for edge in diff.removedEdges
|
|
insert_edge!(graph, edge, false)
|
|
end
|
|
end
|
|
|
|
# Fuse nodes n1 -> n2 -> n3 together into one node, return the applied difference to the graph
|
|
function node_fusion!(graph::DAG, n1::ComputeTaskNode, n2::DataTaskNode, n3::ComputeTaskNode)
|
|
# clear snapshot
|
|
get_snapshot_diff(graph)
|
|
|
|
if !(n1 in graph) || !(n2 in graph) || !(n3 in graph)
|
|
error("[Node Fusion] The given nodes are not part of the given graph")
|
|
end
|
|
|
|
if !is_child(n1, n2) || !is_child(n2, n3) || !is_parent(n3, n2) || !is_parent(n2, n1)
|
|
# the checks are redundant but maybe a good sanity check
|
|
error("[Node Fusion] The given nodes are not connected by edges which is required for node fusion")
|
|
end
|
|
|
|
# save children and parents
|
|
n1_children = children(n1)
|
|
n2_parents = parents(n2)
|
|
n3_parents = parents(n3)
|
|
|
|
if length(n2_parents) > 1
|
|
error("[Node Fusion] The given data node has more than one parent")
|
|
end
|
|
|
|
required_edge1 = make_edge(n1, n2)
|
|
required_edge2 = make_edge(n2, n3)
|
|
|
|
# remove the edges and nodes that will be replaced by the fused node
|
|
remove_edge!(graph, required_edge1)
|
|
remove_edge!(graph, required_edge2)
|
|
remove_node!(graph, n1)
|
|
remove_node!(graph, n2)
|
|
|
|
# get n3's children now so it automatically excludes n2
|
|
n3_children = children(n3)
|
|
remove_node!(graph, n3)
|
|
|
|
# create new node with the fused compute task
|
|
new_node = ComputeTaskNode(FusedComputeTask{typeof(n1.task), typeof(n3.task)}())
|
|
insert_node!(graph, new_node)
|
|
|
|
# "repoint" children of n1 to the new node
|
|
for child in n1_children
|
|
remove_edge!(graph, make_edge(child, n1))
|
|
insert_edge!(graph, make_edge(child, new_node))
|
|
end
|
|
|
|
# "repoint" children of n3 to the new node
|
|
for child in n3_children
|
|
remove_edge!(graph, make_edge(child, n3))
|
|
insert_edge!(graph, make_edge(child, new_node))
|
|
end
|
|
|
|
# "repoint" parents of n3 from new node
|
|
for parent in n3_parents
|
|
remove_edge!(graph, make_edge(n3, parent))
|
|
insert_edge!(graph, make_edge(new_node, parent))
|
|
end
|
|
|
|
return get_snapshot_diff(graph)
|
|
end
|
|
|
|
function node_reduction!(graph::DAG, n1::Node, n2::Node)
|
|
# clear snapshot
|
|
get_snapshot_diff(graph)
|
|
|
|
if !(n1 in graph) || !(n2 in graph)
|
|
error("[Node Reduction] The given nodes are not part of the given graph")
|
|
end
|
|
|
|
if typeof(n1) != typeof(n2)
|
|
error("[Node Reduction] The given nodes are not of the same type")
|
|
end
|
|
|
|
# save n2 parents and children
|
|
n2_children = children(n2)
|
|
n2_parents = parents(n2)
|
|
|
|
if Set(n2_children) != Set(children(n1))
|
|
error("[Node Reduction] The given nodes do not have equal prerequisite nodes which is required for node reduction")
|
|
end
|
|
|
|
# remove n2 and all its parents and children
|
|
for child in n2_children
|
|
remove_edge!(graph, make_edge(child, n2))
|
|
end
|
|
for parent in n2_parents
|
|
remove_edge!(graph, make_edge(n2, parent))
|
|
|
|
# add parents of n2 to n1
|
|
insert_edge!(graph, make_edge(n1, parent))
|
|
end
|
|
remove_node!(graph, n2)
|
|
|
|
return get_snapshot_diff(graph)
|
|
end
|
|
|
|
function node_split!(graph::DAG, n1::Node)
|
|
# clear snapshot
|
|
get_snapshot_diff(graph)
|
|
|
|
if !(n1 in graph)
|
|
error("[Node Split] The given node is not part of the given graph")
|
|
end
|
|
|
|
n1_parents = parents(n1)
|
|
n1_children = children(n1)
|
|
|
|
if length(n1_parents) <= 1
|
|
error("[Node Split] The given node does not have multiple parents which is required for node split")
|
|
end
|
|
|
|
for parent in n1_parents
|
|
remove_edge!(graph, make_edge(n1, parent))
|
|
end
|
|
|
|
for parent in n1_parents
|
|
n_copy = copy(n1)
|
|
insert_node!(graph, n_copy)
|
|
insert_edge!(graph, make_edge(n_copy, parent))
|
|
|
|
for child in n1_children
|
|
insert_edge!(graph, make_edge(child, n_copy))
|
|
end
|
|
end
|
|
|
|
return get_snapshot_diff(graph)
|
|
end
|
|
|
|
# function to generate all possible optmizations on the graph
|
|
function generate_options(graph::DAG)
|
|
options = PossibleOperations()
|
|
|
|
# make sure the graph is fully generated through
|
|
apply_all!(graph)
|
|
|
|
# find possible node fusions
|
|
for node in graph.nodes
|
|
if (typeof(node) <: DataTaskNode)
|
|
node_parents = parents(node)
|
|
if length(node_parents) != 1
|
|
# data node can only have a single parent
|
|
continue
|
|
end
|
|
parent_node = pop!(node_parents)
|
|
|
|
node_children = children(node)
|
|
if length(node_children) != 1
|
|
# this node is an entry node or has multiple children which should not be possible
|
|
continue
|
|
end
|
|
child_node = pop!(node_children)
|
|
|
|
nf = NodeFusion((child_node, node, parent_node))
|
|
push!(options.nodeFusions, nf)
|
|
push!(child_node.operations, nf)
|
|
push!(node.operations, nf)
|
|
push!(parent_node.operations, nf)
|
|
end
|
|
end
|
|
|
|
# find possible node reductions
|
|
visitedNodes = Set{Node}()
|
|
|
|
for node in graph.nodes
|
|
if (node in visitedNodes)
|
|
continue
|
|
end
|
|
|
|
push!(visitedNodes, node)
|
|
|
|
reductionVector = nothing
|
|
# possible reductions are with nodes that are partners, i.e. parents of children
|
|
for partner in partners(node)
|
|
if can_reduce(node, partner)
|
|
if reductionVector === nothing
|
|
# only when there's at least one reduction partner, insert the vector
|
|
reductionVector = Vector{Node}()
|
|
push!(reductionVector, node)
|
|
end
|
|
|
|
push!(reductionVector, partner)
|
|
push!(visitedNodes, partner)
|
|
end
|
|
end
|
|
|
|
if reductionVector !== nothing
|
|
nr = NodeReduction(reductionVector)
|
|
push!(options.nodeReductions, nr)
|
|
for node in reductionVector
|
|
push!(node.operations, nr)
|
|
end
|
|
end
|
|
end
|
|
|
|
# find possible node splits
|
|
for node in graph.nodes
|
|
if (can_split(node))
|
|
ns = NodeSplit(node)
|
|
push!(options.nodeSplits, ns)
|
|
push!(node.operations, ns)
|
|
end
|
|
end
|
|
|
|
options.dirty = false
|
|
|
|
graph.possibleOperations = options
|
|
end
|
|
|
|
function get_operations(graph::DAG)
|
|
if (graph.possibleOperations.dirty)
|
|
generate_options(graph)
|
|
end
|
|
|
|
return graph.possibleOperations
|
|
end |