Rework graph state

This commit is contained in:
2023-06-27 18:38:26 +02:00
parent 669444ebbd
commit 80a3912e3e
7 changed files with 358 additions and 139 deletions

View File

@ -16,7 +16,7 @@ include("utility.jl")
export Node, Edge, ComputeTaskNode, DataTaskNode, DAG
export AbstractTask, AbstractComputeTask, AbstractDataTask, DataTask, ComputeTaskP, ComputeTaskS1, ComputeTaskS2, ComputeTaskV, ComputeTaskU, ComputeTaskSum, FusedComputeTask
export make_node, make_edge, insert_node, insert_edge, is_entry_node, is_exit_node, parents, children, compute, graph_properties, get_exit_node, is_valid
export node_fusion, node_reduction, node_split, generate_options
export NodeFusion, NodeReduction, NodeSplit, push_operation!, pop_operation!, generate_options
export import_txt
export ==, in, show

View File

@ -1,7 +1,88 @@
using DataStructures
const Diff = NamedTuple{
(:addedNodes, :removedNodes, :addedEdges, :removedEdges),
Tuple{Vector{Node}, Vector{Node}, Vector{Edge}, Vector{Edge}}
}
function Diff()
return (
addedNodes = Vector{Node}(),
removedNodes = Vector{Node}(),
addedEdges = Vector{Edge}(),
removedEdges = Vector{Edge}()
)::Diff
end
# An abstract base class for operations
# an operation can be applied to a DAG
abstract type Operation end
# An abstract base class for already applied operations
# an applied operation can be reversed iff it is the last applied operation on the DAG
abstract type AppliedOperation end
struct NodeFusion <: Operation
input::Tuple{ComputeTaskNode,DataTaskNode,ComputeTaskNode}
end
struct AppliedNodeFusion <: AppliedOperation
operation::NodeFusion
diff::Diff
end
struct NodeReduction <: Operation
input::Vector{Node}
end
struct AppliedNodeReduction <: AppliedOperation
operation::NodeReduction
diff::Diff
end
struct NodeSplit <: Operation
input::Node
end
struct AppliedNodeSplit <: AppliedOperation
operation::NodeSplit
diff::Diff
end
const PossibleOperations = NamedTuple{
(:nodeFusions, :nodeReductions, :nodeSplits),
Tuple{Vector{NodeFusion}, Vector{NodeReduction}, Vector{NodeSplit}}
}
function PossibleOperations()
return (
nodeFusions = Vector{NodeFusion}(),
nodeReductions = Vector{NodeReduction}(),
nodeSplits = Vector{NodeSplit}()
)::PossibleOperations
end
# The actual state of the DAG is the initial state given by the set of nodes
# but with all the operations in appliedChain applied in order
mutable struct DAG
nodes::Set{Node}
# The operations currently applied to the set of nodes
appliedOperations::Stack{AppliedOperation}
# The operations not currently applied but part of the current state of the DAG
operationsToApply::Deque{Operation}
# The possible operations at the current state of the DAG
possibleOperations::PossibleOperations
# "snapshot" system: keep track of added/removed nodes/edges since last snapshot
# these are muted in insert_node! etc.
diff::Diff
end
function DAG()
return DAG(Set{Node}())
return DAG(Set{Node}(), Stack{AppliedOperation}(), Deque{Operation}(), PossibleOperations(), Diff())
end

View File

@ -63,43 +63,62 @@ end
is_entry_node(node::Node) = length(children(node)) == 0
is_exit_node(node::Node) = length(parents(node)) == 0
function insert_node(graph::DAG, node::Node)
function insert_node!(graph::DAG, node::Node, track=true)
push!(graph.nodes, node)
if (track) push!(graph.diff.addedNodes, node) end
return node
end
function insert_edge(graph::DAG, edge::Edge)
function insert_edge!(graph::DAG, edge::Edge, track=true)
# edge points from child to parent
push!(edge.edge[1].parents, edge.edge[2])
push!(edge.edge[2].children, edge.edge[1])
if (track) push!(graph.diff.addedEdges) end
return edge
end
function remove_node(graph::DAG, node::Node)
function remove_node!(graph::DAG, node::Node, track=true)
delete!(graph.nodes, node)
if (track) push!(graph.diff.removedNodes, node) end
return nothing
end
function remove_edge(graph::DAG, edge::Edge)
function remove_edge!(graph::DAG, edge::Edge, track=true)
filter!(x -> x != edge.edge[2], edge.edge[1].parents)
filter!(x -> x != edge.edge[1], edge.edge[2].children)
if (track) push!(graph.diff.removedEdges, edge) end
return nothing
end
# return the graph "difference" since last time this function was called
function get_snapshot_diff(graph::DAG)
return swapfield!(graph, :diff, Diff())
return result
end
function graph_properties(graph::DAG)
# make sure the graph is fully generated
apply_all!(graph)
d = 0
ce = 0
ed = 0
for node in graph.nodes
d += data(node.task)
ce += compute_effort(node.task)
ed += length(node.parents)
end
ci = ce / d
result = (data = d,
compute_effort = ce,
compute_intensity = ci)
compute_intensity = ci,
edges = ed)
return result
end

View File

@ -1,5 +1,116 @@
# Fuse nodes n1 -> n2 -> n3 together into one node, return the resulting new node
function node_fusion(graph::DAG, n1::ComputeTaskNode, n2::DataTaskNode, n3::ComputeTaskNode)
# outside interface
# applies a new operation to the end of the graph
function push_operation!(graph::DAG, operation::Operation)
# 1.: Add the operation to the DAG
push!(graph.operationsToApply, operation)
# 2.: Apply all operations in the chain
apply_all!(graph)
# 3.: Regenerate properties, possible operations from here
end
# reverts the latest applied operation, essentially like a ctrl+z for
function pop_operation!(graph::DAG)
# 1.: Remove the operation from the appliedChain of the DAG
if !isempty(graph.operationsToApply)
pop!(graph.operationsToApply)
elseif !isempty(graph.appliedOperations)
appliedOp = pop!(graph.appliedOperations)
revert_operation!(graph, appliedOp)
else
error("No more operations to pop!")
end
# 2.: Apply all (remaining) operations in the chain
apply_all!(graph)
# 3.: Regenerate properties, possible operations from here
end
# implementation detail functions, don't export
# applies all unapplied operations in the DAG
function apply_all!(graph::DAG)
while !isempty(graph.operationsToApply)
# get next operation to apply from front of the deque
op = popfirst!(graph.operationsToApply)
# apply it
appliedOp = apply_operation!(graph, op)
# push to the end of the appliedOperations deque
push!(graph.appliedOperations, appliedOp)
end
end
function apply_operation!(graph::DAG, operation::Operation)
error("Unknown operation type!")
end
function apply_operation!(graph::DAG, operation::NodeFusion)
diff = node_fusion!(graph, operation.input[1], operation.input[2], operation.input[3])
return AppliedNodeFusion(operation, diff)
end
function apply_operation!(graph::DAG, operation::NodeReduction)
diff = node_reduction!(graph, operation.input[1], operation.input[2])
return AppliedNodeReduction(operation, diff)
end
function apply_operation!(graph::DAG, operation::NodeSplit)
diff = node_split!(graph, operation.input[1])
return AppliedNodeSplit(operation, diff)
end
function revert_operation!(graph::DAG, operation::AppliedOperation)
error("Unknown operation type!")
end
function revert_operation!(graph::DAG, operation::AppliedNodeFusion)
revert_diff!(graph, operation.diff)
return operation.operation
end
function revert_operation!(graph::DAG, operation::AppliedNodeReduction)
revert_diff!(graph, operation.diff)
return operation.operation
end
function revert_operation!(graph::DAG, operation::AppliedNodeSplit)
revert_diff!(graph, operation.diff)
return operation.operation
end
function revert_diff!(graph::DAG, diff)
# add removed nodes, remove added nodes, same for edges
# note the order
for edge in diff.addedEdges
remove_edge!(graph, edge, false)
end
for node in diff.addedNodes
remove_node!(graph, node, false)
end
for node in diff.removedNodes
insert_node!(graph, node, false)
end
for edge in diff.removedEdges
insert_edge!(graph, edge, false)
end
end
# Fuse nodes n1 -> n2 -> n3 together into one node, return the applied difference to the graph
function node_fusion!(graph::DAG, n1::ComputeTaskNode, n2::DataTaskNode, n3::ComputeTaskNode)
# clear snapshot
get_snapshot_diff(graph)
if !(n1 in graph) || !(n2 in graph) || !(n3 in graph)
error("[Node Fusion] The given nodes are not part of the given graph")
end
@ -22,41 +133,44 @@ function node_fusion(graph::DAG, n1::ComputeTaskNode, n2::DataTaskNode, n3::Comp
required_edge2 = make_edge(n2, n3)
# remove the edges and nodes that will be replaced by the fused node
remove_edge(graph, required_edge1)
remove_edge(graph, required_edge2)
remove_node(graph, n1)
remove_node(graph, n2)
remove_edge!(graph, required_edge1)
remove_edge!(graph, required_edge2)
remove_node!(graph, n1)
remove_node!(graph, n2)
# get n3's children now so it automatically excludes n2
n3_children = children(n3)
remove_node(graph, n3)
remove_node!(graph, n3)
# create new node with the fused compute task
new_node = ComputeTaskNode(FusedComputeTask{typeof(n1.task), typeof(n3.task)}())
insert_node(graph, new_node)
insert_node!(graph, new_node)
# "repoint" children of n1 to the new node
for child in n1_children
remove_edge(graph, make_edge(child, n1))
insert_edge(graph, make_edge(child, new_node))
remove_edge!(graph, make_edge(child, n1))
insert_edge!(graph, make_edge(child, new_node))
end
# "repoint" children of n3 to the new node
for child in n3_children
remove_edge(graph, make_edge(child, n3))
insert_edge(graph, make_edge(child, new_node))
remove_edge!(graph, make_edge(child, n3))
insert_edge!(graph, make_edge(child, new_node))
end
# "repoint" parents of n3 from new node
for parent in n3_parents
remove_edge(graph, make_edge(n3, parent))
insert_edge(graph, make_edge(new_node, parent))
remove_edge!(graph, make_edge(n3, parent))
insert_edge!(graph, make_edge(new_node, parent))
end
return new_node
return get_snapshot_diff(graph)
end
function node_reduction(graph::DAG, n1::Node, n2::Node)
function node_reduction!(graph::DAG, n1::Node, n2::Node)
# clear snapshot
get_snapshot_diff(graph)
if !(n1 in graph) || !(n2 in graph)
error("[Node Reduction] The given nodes are not part of the given graph")
end
@ -75,20 +189,23 @@ function node_reduction(graph::DAG, n1::Node, n2::Node)
# remove n2 and all its parents and children
for child in n2_children
remove_edge(graph, make_edge(child, n2))
remove_edge!(graph, make_edge(child, n2))
end
for parent in n2_parents
remove_edge(graph, make_edge(n2, parent))
remove_edge!(graph, make_edge(n2, parent))
# add parents of n2 to n1
insert_edge(graph, make_edge(n1, parent))
insert_edge!(graph, make_edge(n1, parent))
end
remove_node(graph, n2)
remove_node!(graph, n2)
return n1
return get_snapshot_diff(graph)
end
function node_split(graph::DAG, n1::Node)
function node_split!(graph::DAG, n1::Node)
# clear snapshot
get_snapshot_diff(graph)
if !(n1 in graph)
error("[Node Split] The given node is not part of the given graph")
end
@ -102,23 +219,24 @@ function node_split(graph::DAG, n1::Node)
for parent in n1_parents
n_copy = copy(n1)
insert_node(graph, n_copy)
insert_edge(graph, make_edge(n_copy, parent))
remove_edge(graph, make_edge(n1, parent))
insert_node!(graph, n_copy)
insert_edge!(graph, make_edge(n_copy, parent))
remove_edge!(graph, make_edge(n1, parent))
for child in n1_children
insert_edge(graph, make_edge(child, n_copy))
insert_edge!(graph, make_edge(child, n_copy))
end
end
return nothing
return get_snapshot_diff(graph)
end
# function to generate all possible optmizations on the graph
function generate_options(graph::DAG)
options = (fusions = Vector{Tuple{ComputeTaskNode, DataTaskNode, ComputeTaskNode}}(),
reductions = Vector{Vector{Node}}(),
splits = Vector{Node}())
options = PossibleOperations()
# make sure the graph is fully generated through
apply_all!(graph)
# find possible node fusions
for node in graph.nodes
@ -137,7 +255,7 @@ function generate_options(graph::DAG)
end
child_node = pop!(node_children)
push!(options.fusions, (child_node, node, parent_node))
push!(options.nodeFusions, NodeFusion((child_node, node, parent_node)))
end
end
@ -159,7 +277,7 @@ function generate_options(graph::DAG)
# only when there's at least one reduction partner, insert the vector
reductionVector = Vector{Node}
push!(reductionVector, node)
push!(options.reductions, reductionVector)
push!(options.nodeReductions, NodeReduction(reductionVector))
end
push!(reductionVector, partner)
@ -171,7 +289,7 @@ function generate_options(graph::DAG)
# find possible node splits
for node in graph.nodes
if (can_split(node))
push!(options.splits, node)
push!(options.nodeSplits, NodeSplit(node))
end
end

View File

@ -39,9 +39,9 @@ function import_txt(filename::String, verbose::Bool = isinteractive())
if (verbose) println("Estimating ", estimate_no_nodes, " Nodes") end
sizehint!(graph.nodes, estimate_no_nodes)
sum_node = insert_node(graph, make_node(ComputeTaskSum()))
global_data_out = insert_node(graph, make_node(DataTask(10)))
insert_edge(graph, make_edge(sum_node, global_data_out))
sum_node = insert_node!(graph, make_node(ComputeTaskSum()), false)
global_data_out = insert_node!(graph, make_node(DataTask(10)), false)
insert_edge!(graph, make_edge(sum_node, global_data_out), false)
# remember the data out nodes for connection
dataOutNodes = Dict()
@ -57,16 +57,16 @@ function import_txt(filename::String, verbose::Bool = isinteractive())
end
if occursin(regex_a, node)
# add nodes and edges for the state reading to u(P(Particle))
data_in = insert_node(graph, make_node(DataTask(4))) # read particle data node
compute_P = insert_node(graph, make_node(ComputeTaskP())) # compute P node
data_Pu = insert_node(graph, make_node(DataTask(6))) # transfer data from P to u
compute_u = insert_node(graph, make_node(ComputeTaskU())) # compute U node
data_out = insert_node(graph, make_node(DataTask(3))) # transfer data out from u
data_in = insert_node!(graph, make_node(DataTask(4)), false) # read particle data node
compute_P = insert_node!(graph, make_node(ComputeTaskP()), false) # compute P node
data_Pu = insert_node!(graph, make_node(DataTask(6)), false) # transfer data from P to u
compute_u = insert_node!(graph, make_node(ComputeTaskU()), false) # compute U node
data_out = insert_node!(graph, make_node(DataTask(3)), false) # transfer data out from u
insert_edge(graph, make_edge(data_in, compute_P))
insert_edge(graph, make_edge(compute_P, data_Pu))
insert_edge(graph, make_edge(data_Pu, compute_u))
insert_edge(graph, make_edge(compute_u, data_out))
insert_edge!(graph, make_edge(data_in, compute_P), false)
insert_edge!(graph, make_edge(compute_P, data_Pu), false)
insert_edge!(graph, make_edge(data_Pu, compute_u), false)
insert_edge!(graph, make_edge(compute_u, data_out), false)
# remember the data_out node for future edges
dataOutNodes[node] = data_out
@ -76,37 +76,37 @@ function import_txt(filename::String, verbose::Bool = isinteractive())
in1 = capt.captures[1]
in2 = capt.captures[2]
compute_v = insert_node(graph, make_node(ComputeTaskV()))
data_out = insert_node(graph, make_node(DataTask(5)))
compute_v = insert_node!(graph, make_node(ComputeTaskV()), false)
data_out = insert_node!(graph, make_node(DataTask(5)), false)
if (occursin(regex_c, capt.captures[1]))
# put an S node after this input
compute_S = insert_node(graph, make_node(ComputeTaskS1()))
data_S_v = insert_node(graph, make_node(DataTask(5)))
compute_S = insert_node!(graph, make_node(ComputeTaskS1()), false)
data_S_v = insert_node!(graph, make_node(DataTask(5)), false)
insert_edge(graph, make_edge(dataOutNodes[capt.captures[1]], compute_S))
insert_edge(graph, make_edge(compute_S, data_S_v))
insert_edge!(graph, make_edge(dataOutNodes[capt.captures[1]], compute_S), false)
insert_edge!(graph, make_edge(compute_S, data_S_v), false)
insert_edge(graph, make_edge(data_S_v, compute_v))
insert_edge!(graph, make_edge(data_S_v, compute_v), false)
else
insert_edge(graph, make_edge(dataOutNodes[capt.captures[1]], compute_v))
insert_edge!(graph, make_edge(dataOutNodes[capt.captures[1]], compute_v), false)
end
if (occursin(regex_c, capt.captures[2]))
# i think the current generator only puts the combined particles in the first space, so this case might never be entered
# put an S node after this input
compute_S = insert_node(graph, make_node(ComputeTaskS1()))
data_S_v = insert_node(graph, make_node(DataTask(5)))
compute_S = insert_node!(graph, make_node(ComputeTaskS1()), false)
data_S_v = insert_node!(graph, make_node(DataTask(5)), false)
insert_edge(graph, make_edge(dataOutNodes[capt.captures[2]], compute_S))
insert_edge(graph, make_edge(compute_S, data_S_v))
insert_edge!(graph, make_edge(dataOutNodes[capt.captures[2]], compute_S), false)
insert_edge!(graph, make_edge(compute_S, data_S_v), false)
insert_edge(graph, make_edge(data_S_v, compute_v))
insert_edge!(graph, make_edge(data_S_v, compute_v), false)
else
insert_edge(graph, make_edge(dataOutNodes[capt.captures[2]], compute_v))
insert_edge!(graph, make_edge(dataOutNodes[capt.captures[2]], compute_v), false)
end
insert_edge(graph, make_edge(compute_v, data_out))
insert_edge!(graph, make_edge(compute_v, data_out), false)
dataOutNodes[node] = data_out
elseif occursin(regex_m, node)
@ -117,22 +117,22 @@ function import_txt(filename::String, verbose::Bool = isinteractive())
in3 = capt.captures[3]
# in2 + in3 with a v
compute_v = insert_node(graph, make_node(ComputeTaskV()))
data_v = insert_node(graph, make_node(DataTask(5)))
compute_v = insert_node!(graph, make_node(ComputeTaskV()), false)
data_v = insert_node!(graph, make_node(DataTask(5)), false)
insert_edge(graph, make_edge(dataOutNodes[in2], compute_v))
insert_edge(graph, make_edge(dataOutNodes[in3], compute_v))
insert_edge(graph, make_edge(compute_v, data_v))
insert_edge!(graph, make_edge(dataOutNodes[in2], compute_v), false)
insert_edge!(graph, make_edge(dataOutNodes[in3], compute_v), false)
insert_edge!(graph, make_edge(compute_v, data_v), false)
# combine with the v of the combined other input
compute_S2 = insert_node(graph, make_node(ComputeTaskS2()))
data_out = insert_node(graph, make_node(DataTask(10)))
compute_S2 = insert_node!(graph, make_node(ComputeTaskS2()), false)
data_out = insert_node!(graph, make_node(DataTask(10)), false)
insert_edge(graph, make_edge(data_v, compute_S2))
insert_edge(graph, make_edge(dataOutNodes[in1], compute_S2))
insert_edge(graph, make_edge(compute_S2, data_out))
insert_edge!(graph, make_edge(data_v, compute_S2), false)
insert_edge!(graph, make_edge(dataOutNodes[in1], compute_S2), false)
insert_edge!(graph, make_edge(compute_S2, data_out), false)
insert_edge(graph, make_edge(data_out, sum_node))
insert_edge!(graph, make_edge(data_out, sum_node), false)
elseif occursin(regex_plus, node)
if (verbose)
println("\rReading Nodes Complete ")