diff --git a/.JuliaFormatter.toml b/.JuliaFormatter.toml new file mode 100644 index 0000000..e6b5e1f --- /dev/null +++ b/.JuliaFormatter.toml @@ -0,0 +1,13 @@ +indent = 4 +margin = 80 +always_for_in = true +for_in_replacement = "in" +whitespace_typedefs = true +whitespace_ops_in_indices = true +long_to_short_function_def = false +always_use_return = true +whitespace_in_kwargs = true +conditional_to_if = true +normalize_line_endings = "unix" + +overwrite = true diff --git a/.gitea/workflows/julia-package-ci.yml b/.gitea/workflows/julia-package-ci.yml index 0f6d916..b81d758 100644 --- a/.gitea/workflows/julia-package-ci.yml +++ b/.gitea/workflows/julia-package-ci.yml @@ -19,13 +19,26 @@ jobs: # run: git lfs checkout - name: Setup Julia environment - uses: https://github.com/julia-actions/setup-julia@v1.9.1 + uses: https://github.com/julia-actions/setup-julia@v1.9.2 with: - version: '1.9.1' + version: '1.9.2' - name: Install dependencies run: julia --project=./ -e 'import Pkg; Pkg.instantiate()' + - name: Format check + run: | + julia --project=./ -e 'using JuliaFormatter; format(".", verbose=true)' + julia --project=./ -e ' + out = Cmd(`git diff --name-only`) |> read |> String + if out == "" + exit(0) + else + @error "Some files have not been formatted !!!" + write(stdout, out) + exit(1) + end' + - name: Run tests run: julia --project=./ -t 4 -e 'import Pkg; Pkg.test()' -O0 diff --git a/Project.toml b/Project.toml index e7ec467..238caf5 100644 --- a/Project.toml +++ b/Project.toml @@ -5,6 +5,7 @@ version = "0.1.0" [deps] DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" +JuliaFormatter = "98e50ef6-434e-11e9-1051-2b60c6c9e899" Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" diff --git a/examples/import_bench.jl b/examples/import_bench.jl index d199d28..64a5000 100644 --- a/examples/import_bench.jl +++ b/examples/import_bench.jl @@ -16,7 +16,10 @@ function bench_txt(filepath::String, bench::Bool = true) println(name, ":") g = parse_abc(filepath) print(g) - #println(" Graph size in memory: ", bytes_to_human_readable(Base.summarysize(g))) + println( + " Graph size in memory: ", + bytes_to_human_readable(MetagraphOptimization.mem(g)), + ) if (bench) @btime parse_abc($filepath) @@ -24,7 +27,7 @@ function bench_txt(filepath::String, bench::Bool = true) println(" Get Operations: ") @time get_operations(g) - println() + return println() end function import_bench() @@ -34,7 +37,7 @@ function import_bench() bench_txt("AB->ABBBBBBB.txt") #bench_txt("AB->ABBBBBBBBB.txt") bench_txt("ABAB->ABAB.txt") - bench_txt("ABAB->ABC.txt") + return bench_txt("ABAB->ABC.txt") end import_bench() diff --git a/examples/plot_chain.jl b/examples/plot_chain.jl index 10b8a83..0298120 100644 --- a/examples/plot_chain.jl +++ b/examples/plot_chain.jl @@ -21,7 +21,7 @@ function gen_plot(filepath) x = Vector{Float64}() y = Vector{Float64}() - for i = 1:30 + for i in 1:30 print("\r", i) # push opt = get_operations(g) @@ -38,7 +38,7 @@ function gen_plot(filepath) push_operation!(g, rand(collect(opt.nodeSplits))) println("NS") else - i = i-1 + i = i - 1 end props = graph_properties(g) @@ -48,13 +48,26 @@ function gen_plot(filepath) println("\rDone.") - plot([x[1], x[2]], [y[1], y[2]], linestyle = :solid, linewidth = 1, color = :red, legend=false) + plot( + [x[1], x[2]], + [y[1], y[2]], + linestyle = :solid, + linewidth = 1, + color = :red, + legend = false, + ) # Create lines connecting the reference point to each data point for i in 3:length(x) - plot!([x[i-1], x[i]], [y[i-1], y[i]], linestyle = :solid, linewidth = 1, color = :red) + plot!( + [x[i - 1], x[i]], + [y[i - 1], y[i]], + linestyle = :solid, + linewidth = 1, + color = :red, + ) end - gui() + return gui() end gen_plot("AB->ABBB.txt") diff --git a/examples/plot_star.jl b/examples/plot_star.jl index 8c9495b..2dca0a2 100644 --- a/examples/plot_star.jl +++ b/examples/plot_star.jl @@ -18,7 +18,7 @@ function gen_plot(filepath) println("Random Walking... ") - for i = 1:30 + for i in 1:30 print("\r", i) # push opt = get_operations(g) @@ -35,7 +35,7 @@ function gen_plot(filepath) push_operation!(g, rand(collect(opt.nodeSplits))) println("NS") else - i = i-1 + i = i - 1 end end @@ -60,7 +60,14 @@ function gen_plot(filepath) push!(y, props.compute_effort) pop_operation!(g) - push!(names, "NF: (" * string(props.data) * ", " * string(props.compute_effort) * ")") + push!( + names, + "NF: (" * + string(props.data) * + ", " * + string(props.compute_effort) * + ")", + ) end for op in opt.nodeReductions push_operation!(g, op) @@ -69,7 +76,14 @@ function gen_plot(filepath) push!(y, props.compute_effort) pop_operation!(g) - push!(names, "NR: (" * string(props.data) * ", " * string(props.compute_effort) * ")") + push!( + names, + "NR: (" * + string(props.data) * + ", " * + string(props.compute_effort) * + ")", + ) end for op in opt.nodeSplits push_operation!(g, op) @@ -78,19 +92,39 @@ function gen_plot(filepath) push!(y, props.compute_effort) pop_operation!(g) - push!(names, "NS: (" * string(props.data) * ", " * string(props.compute_effort) * ")") + push!( + names, + "NS: (" * + string(props.data) * + ", " * + string(props.compute_effort) * + ")", + ) end - - plot([x0, x[1]], [y0, y[1]], linestyle = :solid, linewidth = 1, color = :red, legend=false) + + plot( + [x0, x[1]], + [y0, y[1]], + linestyle = :solid, + linewidth = 1, + color = :red, + legend = false, + ) # Create lines connecting the reference point to each data point for i in 2:length(x) - plot!([x0, x[i]], [y0, y[i]], linestyle = :solid, linewidth = 1, color = :red) + plot!( + [x0, x[i]], + [y0, y[i]], + linestyle = :solid, + linewidth = 1, + color = :red, + ) end #scatter!(x, y, label=names) print(names) - gui() + return gui() end gen_plot("AB->ABBB.txt") diff --git a/examples/profiling_utilities.jl b/examples/profiling_utilities.jl index e279f76..0d14835 100644 --- a/examples/profiling_utilities.jl +++ b/examples/profiling_utilities.jl @@ -5,7 +5,7 @@ function test_random_walk(g::DAG, n::Int64) properties = graph_properties(g) - for i = 1:n + for i in 1:n # choose push or pop if rand(Bool) # push @@ -32,5 +32,5 @@ function test_random_walk(g::DAG, n::Int64) end end - reset_graph!(g) -end \ No newline at end of file + return reset_graph!(g) +end diff --git a/src/MetagraphOptimization.jl b/src/MetagraphOptimization.jl index 5909227..c75c14c 100644 --- a/src/MetagraphOptimization.jl +++ b/src/MetagraphOptimization.jl @@ -1,10 +1,35 @@ module MetagraphOptimization export Node, Edge, ComputeTaskNode, DataTaskNode, DAG -export AbstractTask, AbstractComputeTask, AbstractDataTask, DataTask, FusedComputeTask -export make_node, make_edge, insert_node, insert_edge, is_entry_node, is_exit_node, parents, children, compute, graph_properties, get_exit_node, is_valid -export NodeFusion, NodeReduction, NodeSplit, push_operation!, pop_operation!, can_pop, reset_graph!, get_operations -export parse_abc, ComputeTaskP, ComputeTaskS1, ComputeTaskS2, ComputeTaskV, ComputeTaskU, ComputeTaskSum +export AbstractTask, + AbstractComputeTask, AbstractDataTask, DataTask, FusedComputeTask +export make_node, + make_edge, + insert_node, + insert_edge, + is_entry_node, + is_exit_node, + parents, + children, + compute, + graph_properties, + get_exit_node, + is_valid +export NodeFusion, + NodeReduction, + NodeSplit, + push_operation!, + pop_operation!, + can_pop, + reset_graph!, + get_operations +export parse_abc, + ComputeTaskP, + ComputeTaskS1, + ComputeTaskS2, + ComputeTaskV, + ComputeTaskU, + ComputeTaskSum export ==, in, show, isempty, delete!, length @@ -21,29 +46,45 @@ import Base.insert! import Base.collect -include("tasks.jl") -include("nodes.jl") -include("graph.jl") +include("task/type.jl") +include("node/type.jl") +include("diff/type.jl") +include("operation/type.jl") +include("graph/type.jl") include("trie.jl") include("utility.jl") -include("task_functions.jl") -include("node_functions.jl") -include("graph_functions.jl") +include("diff/print.jl") +include("diff/properties.jl") -include("operations/utility.jl") -include("operations/apply.jl") -include("operations/clean.jl") -include("operations/find.jl") -include("operations/get.jl") -include("operations/print.jl") -include("operations/validate.jl") +include("graph/compare.jl") +include("graph/interface.jl") +include("graph/mute.jl") +include("graph/print.jl") +include("graph/properties.jl") +include("graph/validate.jl") -include("graph_interface.jl") +include("node/compare.jl") +include("node/create.jl") +include("node/print.jl") +include("node/properties.jl") +include("node/validate.jl") -include("abc_model/tasks.jl") -include("abc_model/task_functions.jl") -include("abc_model/parse.jl") +include("operation/utility.jl") +include("operation/apply.jl") +include("operation/clean.jl") +include("operation/find.jl") +include("operation/get.jl") +include("operation/print.jl") +include("operation/validate.jl") + +include("task/compare.jl") +include("task/print.jl") +include("task/properties.jl") + +include("models/abc/types.jl") +include("models/abc/properties.jl") +include("models/abc/parse.jl") end # module MetagraphOptimization diff --git a/src/abc_model/tasks.jl b/src/abc_model/tasks.jl deleted file mode 100644 index 40b095e..0000000 --- a/src/abc_model/tasks.jl +++ /dev/null @@ -1,29 +0,0 @@ -struct DataTask <: AbstractDataTask - data::UInt64 -end - -# S task with 1 child -struct ComputeTaskS1 <: AbstractComputeTask -end - -# S task with 2 children -struct ComputeTaskS2 <: AbstractComputeTask -end - -# P task with 0 children -struct ComputeTaskP <: AbstractComputeTask -end - -# v task with 2 children -struct ComputeTaskV <: AbstractComputeTask -end - -# u task with 1 child -struct ComputeTaskU <: AbstractComputeTask -end - -# task that sums all its inputs, n children -struct ComputeTaskSum <: AbstractComputeTask -end - -ABC_TASKS = [DataTask, ComputeTaskS1, ComputeTaskS2, ComputeTaskP, ComputeTaskV, ComputeTaskU, ComputeTaskSum] diff --git a/src/diff/print.jl b/src/diff/print.jl new file mode 100644 index 0000000..d949341 --- /dev/null +++ b/src/diff/print.jl @@ -0,0 +1,6 @@ +function show(io::IO, diff::Diff) + print(io, "Nodes: ") + print(io, length(diff.addedNodes) + length(diff.removedNodes)) + print(io, " Edges: ") + return print(io, length(diff.addedEdges) + length(diff.removedEdges)) +end diff --git a/src/diff/properties.jl b/src/diff/properties.jl new file mode 100644 index 0000000..63dfe67 --- /dev/null +++ b/src/diff/properties.jl @@ -0,0 +1,9 @@ +# return a namedtuple of the lengths of the added/removed nodes/edges +function length(diff::Diff) + return ( + addedNodes = length(diff.addedNodes), + removedNodes = length(diff.removedNodes), + addedEdges = length(diff.addedEdges), + removedEdges = length(diff.removedEdges), + ) +end diff --git a/src/diff/type.jl b/src/diff/type.jl new file mode 100644 index 0000000..e7e7471 --- /dev/null +++ b/src/diff/type.jl @@ -0,0 +1,13 @@ +const Diff = NamedTuple{ + (:addedNodes, :removedNodes, :addedEdges, :removedEdges), + Tuple{Vector{Node}, Vector{Node}, Vector{Edge}, Vector{Edge}}, +} + +function Diff() + return ( + addedNodes = Vector{Node}(), + removedNodes = Vector{Node}(), + addedEdges = Vector{Edge}(), + removedEdges = Vector{Edge}(), + )::Diff +end diff --git a/src/graph.jl b/src/graph.jl deleted file mode 100644 index 92ed7f3..0000000 --- a/src/graph.jl +++ /dev/null @@ -1,90 +0,0 @@ -using DataStructures - -const Diff = NamedTuple{ - (:addedNodes, :removedNodes, :addedEdges, :removedEdges), - Tuple{Vector{Node}, Vector{Node}, Vector{Edge}, Vector{Edge}} -} - -function Diff() - return ( - addedNodes = Vector{Node}(), - removedNodes = Vector{Node}(), - addedEdges = Vector{Edge}(), - removedEdges = Vector{Edge}() - )::Diff -end - -# An abstract base class for operations -# an operation can be applied to a DAG -abstract type Operation end - -# An abstract base class for already applied operations -# an applied operation can be reversed iff it is the last applied operation on the DAG -abstract type AppliedOperation end - -struct NodeFusion <: Operation - input::Tuple{ComputeTaskNode, DataTaskNode, ComputeTaskNode} -end - -struct AppliedNodeFusion <: AppliedOperation - operation::NodeFusion - diff::Diff -end - -struct NodeReduction <: Operation - input::Vector{Node} -end - -struct AppliedNodeReduction <: AppliedOperation - operation::NodeReduction - diff::Diff -end - -struct NodeSplit <: Operation - input::Node -end - -struct AppliedNodeSplit <: AppliedOperation - operation::NodeSplit - diff::Diff -end - -mutable struct PossibleOperations - nodeFusions::Set{NodeFusion} - nodeReductions::Set{NodeReduction} - nodeSplits::Set{NodeSplit} -end - -function PossibleOperations() - return PossibleOperations( - Set{NodeFusion}(), - Set{NodeReduction}(), - Set{NodeSplit}() - ) -end - -# The actual state of the DAG is the initial state given by the set of nodes -# but with all the operations in appliedChain applied in order -mutable struct DAG - nodes::Set{Node} - - # The operations currently applied to the set of nodes - appliedOperations::Stack{AppliedOperation} - - # The operations not currently applied but part of the current state of the DAG - operationsToApply::Deque{Operation} - - # The possible operations at the current state of the DAG - possibleOperations::PossibleOperations - - # The set of nodes whose possible operations need to be reevaluated - dirtyNodes::Set{Node} - - # "snapshot" system: keep track of added/removed nodes/edges since last snapshot - # these are muted in insert_node! etc. - diff::Diff -end - -function DAG() - return DAG(Set{Node}(), Stack{AppliedOperation}(), Deque{Operation}(), PossibleOperations(), Set{Node}(), Diff()) -end diff --git a/src/graph/compare.jl b/src/graph/compare.jl new file mode 100644 index 0000000..8a67523 --- /dev/null +++ b/src/graph/compare.jl @@ -0,0 +1,14 @@ + +in(node::Node, graph::DAG) = node in graph.nodes +in(edge::Edge, graph::DAG) = edge in graph.edges + +function ==(n1::Node, n2::Node, g::DAG) + if typeof(n1) != typeof(n2) + return false + end + if !(n1 in g) || !(n2 in g) + return false + end + + return n1.task == n2.task && children(n1) == children(n2) +end diff --git a/src/graph/interface.jl b/src/graph/interface.jl new file mode 100644 index 0000000..20df42a --- /dev/null +++ b/src/graph/interface.jl @@ -0,0 +1,36 @@ +# user interface on the DAG + +# applies a new operation to the end of the graph +function push_operation!(graph::DAG, operation::Operation) + # 1.: Add the operation to the DAG + push!(graph.operationsToApply, operation) + + return nothing +end + +# reverts the latest applied operation, essentially like a ctrl+z for +function pop_operation!(graph::DAG) + # 1.: Remove the operation from the appliedChain of the DAG + if !isempty(graph.operationsToApply) + pop!(graph.operationsToApply) + elseif !isempty(graph.appliedOperations) + appliedOp = pop!(graph.appliedOperations) + revert_operation!(graph, appliedOp) + else + error("No more operations to pop!") + end + + return nothing +end + +can_pop(graph::DAG) = + !isempty(graph.operationsToApply) || !isempty(graph.appliedOperations) + +# reset the graph to its initial state with no operations applied +function reset_graph!(graph::DAG) + while (can_pop(graph)) + pop_operation!(graph) + end + + return nothing +end diff --git a/src/graph/mute.jl b/src/graph/mute.jl new file mode 100644 index 0000000..94e68c8 --- /dev/null +++ b/src/graph/mute.jl @@ -0,0 +1,200 @@ +# for graph mutating functions we need to do a few things +# 1: mute the graph (duh) +# 2: keep track of what was changed for the diff (if track == true) +# 3: invalidate operation caches + +function insert_node!( + graph::DAG, + node::Node, + track = true, + invalidate_cache = true, +) + # 1: mute + push!(graph.nodes, node) + + # 2: keep track + if (track) + push!(graph.diff.addedNodes, node) + end + + # 3: invalidate caches + if (!invalidate_cache) + return node + end + push!(graph.dirtyNodes, node) + + return node +end + +function insert_edge!( + graph::DAG, + node1::Node, + node2::Node, + track = true, + invalidate_cache = true, +) + # @assert (node2 ∉ node1.parents) && (node1 ∉ node2.children) "Edge to insert already exists" + + # 1: mute + # edge points from child to parent + push!(node1.parents, node2) + push!(node2.children, node1) + + # 2: keep track + if (track) + push!(graph.diff.addedEdges, make_edge(node1, node2)) + end + + # 3: invalidate caches + if (!invalidate_cache) + return nothing + end + + invalidate_operation_caches!(graph, node1) + invalidate_operation_caches!(graph, node2) + + push!(graph.dirtyNodes, node1) + push!(graph.dirtyNodes, node2) + + return nothing +end + +function remove_node!( + graph::DAG, + node::Node, + track = true, + invalidate_cache = true, +) + # @assert node in graph.nodes "Trying to remove a node that's not in the graph" + + # 1: mute + delete!(graph.nodes, node) + + # 2: keep track + if (track) + push!(graph.diff.removedNodes, node) + end + + # 3: invalidate caches + if (!invalidate_cache) + return nothing + end + + invalidate_operation_caches!(graph, node) + delete!(graph.dirtyNodes, node) + + return nothing +end + +function remove_edge!( + graph::DAG, + node1::Node, + node2::Node, + track = true, + invalidate_cache = true, +) + # 1: mute + pre_length1 = length(node1.parents) + pre_length2 = length(node2.children) + filter!(x -> x != node2, node1.parents) + filter!(x -> x != node1, node2.children) + + #=@assert begin + removed = pre_length1 - length(node1.parents) + removed <= 1 + end "removed more than one node from node1's parents"=# + + #=@assert begin + removed = pre_length2 - length(node2.children) + removed <= 1 + end "removed more than one node from node2's children"=# + + # 2: keep track + if (track) + push!(graph.diff.removedEdges, make_edge(node1, node2)) + end + + # 3: invalidate caches + if (!invalidate_cache) + return nothing + end + + invalidate_operation_caches!(graph, node1) + invalidate_operation_caches!(graph, node2) + if (node1 in graph) + push!(graph.dirtyNodes, node1) + end + if (node2 in graph) + push!(graph.dirtyNodes, node2) + end + + return nothing +end + +# return the graph "difference" since last time this function was called +function get_snapshot_diff(graph::DAG) + return swapfield!(graph, :diff, Diff()) +end + +# function to invalidate the operation caches for a given NodeFusion +function invalidate_caches!(graph::DAG, operation::NodeFusion) + delete!(graph.possibleOperations, operation) + + # delete the operation from all caches of nodes involved in the operation + filter!(!=(operation), operation.input[1].nodeFusions) + filter!(!=(operation), operation.input[3].nodeFusions) + + operation.input[2].nodeFusion = missing + + return nothing +end + +# function to invalidate the operation caches for a given NodeReduction +function invalidate_caches!(graph::DAG, operation::NodeReduction) + delete!(graph.possibleOperations, operation) + + for node in operation.input + node.nodeReduction = missing + end + + return nothing +end + +# function to invalidate the operation caches for a given NodeSplit +function invalidate_caches!(graph::DAG, operation::NodeSplit) + delete!(graph.possibleOperations, operation) + + # delete the operation from all caches of nodes involved in the operation + # for node split there is only one node + operation.input.nodeSplit = missing + + return nothing +end + +# function to invalidate the operation caches of a ComputeTaskNode +function invalidate_operation_caches!(graph::DAG, node::ComputeTaskNode) + if !ismissing(node.nodeReduction) + invalidate_caches!(graph, node.nodeReduction) + end + if !ismissing(node.nodeSplit) + invalidate_caches!(graph, node.nodeSplit) + end + while !isempty(node.nodeFusions) + invalidate_caches!(graph, pop!(node.nodeFusions)) + end + return nothing +end + +# function to invalidate the operation caches of a DataTaskNode +function invalidate_operation_caches!(graph::DAG, node::DataTaskNode) + if !ismissing(node.nodeReduction) + invalidate_caches!(graph, node.nodeReduction) + end + if !ismissing(node.nodeSplit) + invalidate_caches!(graph, node.nodeSplit) + end + if !ismissing(node.nodeFusion) + invalidate_caches!(graph, node.nodeFusion) + end + return nothing +end diff --git a/src/graph/print.jl b/src/graph/print.jl new file mode 100644 index 0000000..7717724 --- /dev/null +++ b/src/graph/print.jl @@ -0,0 +1,59 @@ +function show_nodes(io, graph::DAG) + print(io, "[") + first = true + for n in graph.nodes + if first + first = false + else + print(io, ", ") + end + print(io, n) + end + return print(io, "]") +end + +function show(io::IO, graph::DAG) + println(io, "Graph:") + print(io, " Nodes: ") + + nodeDict = Dict{Type, Int64}() + noEdges = 0 + for node in graph.nodes + if haskey(nodeDict, typeof(node.task)) + nodeDict[typeof(node.task)] = nodeDict[typeof(node.task)] + 1 + else + nodeDict[typeof(node.task)] = 1 + end + noEdges += length(parents(node)) + end + + if length(graph.nodes) <= 20 + show_nodes(io, graph) + else + print("Total: ", length(graph.nodes), ", ") + first = true + i = 0 + for (type, number) in zip(keys(nodeDict), values(nodeDict)) + i += 1 + if first + first = false + else + print(", ") + end + if (i % 3 == 0) + print("\n ") + end + print(type, ": ", number) + end + end + println(io) + println(io, " Edges: ", noEdges) + properties = graph_properties(graph) + println(io, " Total Compute Effort: ", properties.compute_effort) + println(io, " Total Data Transfer: ", properties.data) + return println( + io, + " Total Compute Intensity: ", + properties.compute_intensity, + ) +end diff --git a/src/graph/properties.jl b/src/graph/properties.jl new file mode 100644 index 0000000..da254d2 --- /dev/null +++ b/src/graph/properties.jl @@ -0,0 +1,33 @@ +function graph_properties(graph::DAG) + # make sure the graph is fully generated + apply_all!(graph) + + d = 0 + ce = 0 + ed = 0 + for node in graph.nodes + d += data(node.task) * length(node.parents) + ce += compute_effort(node.task) + ed += length(node.parents) + end + + ci = ce / d + + result = ( + data = d, + compute_effort = ce, + compute_intensity = ci, + nodes = length(graph.nodes), + edges = ed, + ) + return result +end + +function get_exit_node(graph::DAG) + for node in graph.nodes + if (is_exit_node(node)) + return node + end + end + @assert false "The given graph has no exit node! It is either empty or not acyclic!" +end diff --git a/src/graph/type.jl b/src/graph/type.jl new file mode 100644 index 0000000..06cc115 --- /dev/null +++ b/src/graph/type.jl @@ -0,0 +1,48 @@ +using DataStructures + +mutable struct PossibleOperations + nodeFusions::Set{NodeFusion} + nodeReductions::Set{NodeReduction} + nodeSplits::Set{NodeSplit} +end + +function PossibleOperations() + return PossibleOperations( + Set{NodeFusion}(), + Set{NodeReduction}(), + Set{NodeSplit}(), + ) +end + +# The actual state of the DAG is the initial state given by the set of nodes +# but with all the operations in appliedChain applied in order +mutable struct DAG + nodes::Set{Node} + + # The operations currently applied to the set of nodes + appliedOperations::Stack{AppliedOperation} + + # The operations not currently applied but part of the current state of the DAG + operationsToApply::Deque{Operation} + + # The possible operations at the current state of the DAG + possibleOperations::PossibleOperations + + # The set of nodes whose possible operations need to be reevaluated + dirtyNodes::Set{Node} + + # "snapshot" system: keep track of added/removed nodes/edges since last snapshot + # these are muted in insert_node! etc. + diff::Diff +end + +function DAG() + return DAG( + Set{Node}(), + Stack{AppliedOperation}(), + Deque{Operation}(), + PossibleOperations(), + Set{Node}(), + Diff(), + ) +end diff --git a/src/graph/validate.jl b/src/graph/validate.jl new file mode 100644 index 0000000..d2c7dba --- /dev/null +++ b/src/graph/validate.jl @@ -0,0 +1,52 @@ +# check whether the given graph is connected +function is_connected(graph::DAG) + nodeQueue = Deque{Node}() + push!(nodeQueue, get_exit_node(graph)) + seenNodes = Set{Node}() + + while !isempty(nodeQueue) + current = pop!(nodeQueue) + push!(seenNodes, current) + + for child in current.children + push!(nodeQueue, child) + end + end + + return length(seenNodes) == length(graph.nodes) +end + +function is_valid(graph::DAG) + for node in graph.nodes + @assert is_valid(graph, node) + end + + for op in graph.operationsToApply + @assert is_valid(graph, op) + end + + for nr in graph.possibleOperations.nodeReductions + @assert is_valid(graph, nr) + end + for ns in graph.possibleOperations.nodeSplits + @assert is_valid(graph, ns) + end + for nf in graph.possibleOperations.nodeFusions + @assert is_valid(graph, nf) + end + + for node in graph.dirtyNodes + @assert node in graph "Dirty Node is not part of the graph!" + @assert ismissing(node.nodeReduction) "Dirty Node has a NodeReduction!" + @assert ismissing(node.nodeSplit) "Dirty Node has a NodeSplit!" + if (typeof(node) <: DataTaskNode) + @assert ismissing(node.nodeFusion) "Dirty DataTaskNode has a Node Fusion!" + elseif (typeof(node) <: ComputeTaskNode) + @assert isempty(node.nodeFusions) "Dirty ComputeTaskNode has Node Fusions!" + end + end + + @assert is_connected(graph) "Graph is not connected!" + + return true +end diff --git a/src/graph_functions.jl b/src/graph_functions.jl deleted file mode 100644 index 58aac5b..0000000 --- a/src/graph_functions.jl +++ /dev/null @@ -1,354 +0,0 @@ -using DataStructures - -in(node::Node, graph::DAG) = node in graph.nodes -in(edge::Edge, graph::DAG) = edge in graph.edges - -function is_parent(potential_parent, node) - return potential_parent in node.parents -end - -function is_child(potential_child, node) - return potential_child in node.children -end - -function ==(n1::Node, n2::Node, g::DAG) - if typeof(n1) != typeof(n2) - return false - end - if !(n1 in g) || !(n2 in g) - return false - end - - return n1.task == n2.task && children(n1) == children(n2) -end - -# children = prerequisite nodes, nodes that need to execute before the task, edges point into this task -function children(node::Node) - return copy(node.children) -end - -# parents = subsequent nodes, nodes that need this node to execute, edges point from this task -function parents(node::Node) - return copy(node.parents) -end - -# siblings = all children of any parents, no duplicates, includes the node itself -function siblings(node::Node) - result = Set{Node}() - push!(result, node) - for parent in node.parents - union!(result, parent.children) - end - - return result -end - -# partners = all parents of any children, no duplicates, includes the node itself -function partners(node::Node) - result = Set{Node}() - push!(result, node) - for child in node.children - union!(result, child.parents) - end - - return result -end - -# alternative version to partners(Node), avoiding allocation of a new set -# works on the given set and returns nothing -function partners(node::Node, set::Set{Node}) - push!(set, node) - for child in node.children - union!(set, child.parents) - end - return nothing -end - -is_entry_node(node::Node) = length(node.children) == 0 -is_exit_node(node::Node) = length(node.parents) == 0 - -# function to invalidate the operation caches for a given NodeFusion -function invalidate_caches!(graph::DAG, operation::NodeFusion) - delete!(graph.possibleOperations, operation) - - # delete the operation from all caches of nodes involved in the operation - filter!(!=(operation), operation.input[1].nodeFusions) - filter!(!=(operation), operation.input[3].nodeFusions) - - operation.input[2].nodeFusion = missing - - return nothing -end - -# function to invalidate the operation caches for a given NodeReduction -function invalidate_caches!(graph::DAG, operation::NodeReduction) - delete!(graph.possibleOperations, operation) - - for node in operation.input - node.nodeReduction = missing - end - - return nothing -end - -# function to invalidate the operation caches for a given NodeSplit -function invalidate_caches!(graph::DAG, operation::NodeSplit) - delete!(graph.possibleOperations, operation) - - # delete the operation from all caches of nodes involved in the operation - # for node split there is only one node - operation.input.nodeSplit = missing - - return nothing -end - -# function to invalidate the operation caches of a ComputeTaskNode -function invalidate_operation_caches!(graph::DAG, node::ComputeTaskNode) - if !ismissing(node.nodeReduction) - invalidate_caches!(graph, node.nodeReduction) - end - if !ismissing(node.nodeSplit) - invalidate_caches!(graph, node.nodeSplit) - end - while !isempty(node.nodeFusions) - invalidate_caches!(graph, pop!(node.nodeFusions)) - end - return nothing -end - -# function to invalidate the operation caches of a DataTaskNode -function invalidate_operation_caches!(graph::DAG, node::DataTaskNode) - if !ismissing(node.nodeReduction) - invalidate_caches!(graph, node.nodeReduction) - end - if !ismissing(node.nodeSplit) - invalidate_caches!(graph, node.nodeSplit) - end - if !ismissing(node.nodeFusion) - invalidate_caches!(graph, node.nodeFusion) - end - return nothing -end - -# for graph mutating functions we need to do a few things -# 1: mute the graph (duh) -# 2: keep track of what was changed for the diff (if track == true) -# 3: invalidate operation caches - -function insert_node!(graph::DAG, node::Node, track=true, invalidate_cache=true) - # 1: mute - push!(graph.nodes, node) - - # 2: keep track - if (track) push!(graph.diff.addedNodes, node) end - - # 3: invalidate caches - if (!invalidate_cache) return node end - push!(graph.dirtyNodes, node) - - return node -end - -function insert_edge!(graph::DAG, node1::Node, node2::Node, track=true, invalidate_cache=true) - # @assert (node2 ∉ node1.parents) && (node1 ∉ node2.children) "Edge to insert already exists" - - # 1: mute - # edge points from child to parent - push!(node1.parents, node2) - push!(node2.children, node1) - - # 2: keep track - if (track) push!(graph.diff.addedEdges, make_edge(node1, node2)) end - - # 3: invalidate caches - if (!invalidate_cache) return nothing end - - invalidate_operation_caches!(graph, node1) - invalidate_operation_caches!(graph, node2) - - push!(graph.dirtyNodes, node1) - push!(graph.dirtyNodes, node2) - - return nothing -end - -function remove_node!(graph::DAG, node::Node, track=true, invalidate_cache=true) - # @assert node in graph.nodes "Trying to remove a node that's not in the graph" - - # 1: mute - delete!(graph.nodes, node) - - # 2: keep track - if (track) push!(graph.diff.removedNodes, node) end - - # 3: invalidate caches - if (!invalidate_cache) return nothing end - - invalidate_operation_caches!(graph, node) - delete!(graph.dirtyNodes, node) - - return nothing -end - -function remove_edge!(graph::DAG, node1::Node, node2::Node, track=true, invalidate_cache=true) - # 1: mute - pre_length1 = length(node1.parents) - pre_length2 = length(node2.children) - filter!(x -> x != node2, node1.parents) - filter!(x -> x != node1, node2.children) - - #=@assert begin - removed = pre_length1 - length(node1.parents) - removed <= 1 - end "removed more than one node from node1's parents"=# - - #=@assert begin - removed = pre_length2 - length(node2.children) - removed <= 1 - end "removed more than one node from node2's children"=# - - # 2: keep track - if (track) push!(graph.diff.removedEdges, make_edge(node1, node2)) end - - # 3: invalidate caches - if (!invalidate_cache) return nothing end - - invalidate_operation_caches!(graph, node1) - invalidate_operation_caches!(graph, node2) - if (node1 in graph) - push!(graph.dirtyNodes, node1) - end - if (node2 in graph) - push!(graph.dirtyNodes, node2) - end - - return nothing -end - -# return the graph "difference" since last time this function was called -function get_snapshot_diff(graph::DAG) - return swapfield!(graph, :diff, Diff()) -end - -function graph_properties(graph::DAG) - # make sure the graph is fully generated - apply_all!(graph) - - d = 0 - ce = 0 - ed = 0 - for node in graph.nodes - d += data(node.task) * length(node.parents) - ce += compute_effort(node.task) - ed += length(node.parents) - end - - ci = ce / d - - result = (data = d, - compute_effort = ce, - compute_intensity = ci, - nodes = length(graph.nodes), - edges = ed) - return result -end - -function get_exit_node(graph::DAG) - for node in graph.nodes - if (is_exit_node(node)) - return node - end - end - @assert false "The given graph has no exit node! It is either empty or not acyclic!" -end - -# check whether the given graph is connected -function is_valid(graph::DAG) - nodeQueue = Deque{Node}() - push!(nodeQueue, get_exit_node(graph)) - seenNodes = Set{Node}() - - while !isempty(nodeQueue) - current = pop!(nodeQueue) - push!(seenNodes, current) - - for child in current.chlidren - push!(nodeQueue, child) - end - end - - return length(seenNodes) == length(graph.nodes) -end - -function show_nodes(io, graph::DAG) - print(io, "[") - first = true - for n in graph.nodes - if first - first = false - else - print(io, ", ") - end - print(io, n) - end - print(io, "]") -end - -function show(io::IO, graph::DAG) - println(io, "Graph:") - print(io, " Nodes: ") - - nodeDict = Dict{Type, Int64}() - noEdges = 0 - for node in graph.nodes - if haskey(nodeDict, typeof(node.task)) - nodeDict[typeof(node.task)] = nodeDict[typeof(node.task)] + 1 - else - nodeDict[typeof(node.task)] = 1 - end - noEdges += length(parents(node)) - end - - if length(graph.nodes) <= 20 - show_nodes(io, graph) - else - print("Total: ", length(graph.nodes), ", ") - first = true - i = 0 - for (type, number) in zip(keys(nodeDict), values(nodeDict)) - i += 1 - if first - first = false - else - print(", ") - end - if (i % 3 == 0) - print("\n ") - end - print(type, ": ", number) - end - end - println(io) - println(io, " Edges: ", noEdges) - properties = graph_properties(graph) - println(io, " Total Compute Effort: ", properties.compute_effort) - println(io, " Total Data Transfer: ", properties.data) - println(io, " Total Compute Intensity: ", properties.compute_intensity) -end - -function show(io::IO, diff::Diff) - print(io, "Nodes: ") - print(io, length(diff.addedNodes) + length(diff.removedNodes)) - print(io, " Edges: ") - print(io, length(diff.addedEdges) + length(diff.removedEdges)) -end - -# return a namedtuple of the lengths of the added/removed nodes/edges -function length(diff::Diff) - return ( - addedNodes = length(diff.addedNodes), - removedNodes = length(diff.removedNodes), - addedEdges = length(diff.addedEdges), - removedEdges = length(diff.removedEdges) - ) -end diff --git a/src/graph_interface.jl b/src/graph_interface.jl deleted file mode 100644 index 7cc414f..0000000 --- a/src/graph_interface.jl +++ /dev/null @@ -1,34 +0,0 @@ -# user interface on the DAG - -# applies a new operation to the end of the graph -function push_operation!(graph::DAG, operation::Operation) - # 1.: Add the operation to the DAG - push!(graph.operationsToApply, operation) - - return nothing -end - -# reverts the latest applied operation, essentially like a ctrl+z for -function pop_operation!(graph::DAG) - # 1.: Remove the operation from the appliedChain of the DAG - if !isempty(graph.operationsToApply) - pop!(graph.operationsToApply) - elseif !isempty(graph.appliedOperations) - appliedOp = pop!(graph.appliedOperations) - revert_operation!(graph, appliedOp) - else - error("No more operations to pop!") - end - return nothing -end - -can_pop(graph::DAG) = !isempty(graph.operationsToApply) || !isempty(graph.appliedOperations) - -# reset the graph to its initial state with no operations applied -function reset_graph!(graph::DAG) - while (can_pop(graph)) - pop_operation!(graph) - end - - return nothing -end diff --git a/src/abc_model/parse.jl b/src/models/abc/parse.jl similarity index 74% rename from src/abc_model/parse.jl rename to src/models/abc/parse.jl index 1d50df9..e5ed4c5 100644 --- a/src/abc_model/parse.jl +++ b/src/models/abc/parse.jl @@ -24,20 +24,26 @@ end function parse_abc(filename::String, verbose::Bool = false) file = open(filename, "r") - if (verbose) println("Opened file") end + if (verbose) + println("Opened file") + end nodes_string = readline(file) nodes = parse_nodes(nodes_string) close(file) - if (verbose) println("Read file") end + if (verbose) + println("Read file") + end graph = DAG() - + # estimate total number of nodes # try to slightly overestimate so no resizing is necessary # data nodes are not included in length(nodes) and there are a few more than compute nodes estimate_no_nodes = round(Int, length(nodes) * 4) - if (verbose) println("Estimating ", estimate_no_nodes, " Nodes") end + if (verbose) + println("Estimating ", estimate_no_nodes, " Nodes") + end sizehint!(graph.nodes, estimate_no_nodes) sum_node = insert_node!(graph, make_node(ComputeTaskSum()), false, false) @@ -47,43 +53,58 @@ function parse_abc(filename::String, verbose::Bool = false) # remember the data out nodes for connection dataOutNodes = Dict() - if (verbose) println("Building graph") end + if (verbose) + println("Building graph") + end noNodes = 0 nodesToRead = length(nodes) while !isempty(nodes) node = popfirst!(nodes) noNodes += 1 if (noNodes % 100 == 0) - if (verbose) @printf "\rReading Nodes... %.2f%%" (100. * noNodes / nodesToRead) end + if (verbose) + @printf "\rReading Nodes... %.2f%%" ( + 100.0 * noNodes / nodesToRead + ) + end end if occursin(regex_a, node) # add nodes and edges for the state reading to u(P(Particle)) data_in = insert_node!(graph, make_node(DataTask(4)), false, false) # read particle data node - compute_P = insert_node!(graph, make_node(ComputeTaskP()), false, false) # compute P node + compute_P = + insert_node!(graph, make_node(ComputeTaskP()), false, false) # compute P node data_Pu = insert_node!(graph, make_node(DataTask(6)), false, false) # transfer data from P to u - compute_u = insert_node!(graph, make_node(ComputeTaskU()), false, false) # compute U node + compute_u = + insert_node!(graph, make_node(ComputeTaskU()), false, false) # compute U node data_out = insert_node!(graph, make_node(DataTask(3)), false, false) # transfer data out from u insert_edge!(graph, data_in, compute_P, false, false) insert_edge!(graph, compute_P, data_Pu, false, false) insert_edge!(graph, data_Pu, compute_u, false, false) insert_edge!(graph, compute_u, data_out, false, false) - + # remember the data_out node for future edges dataOutNodes[node] = data_out elseif occursin(regex_c, node) capt = match(regex_c, node) - + in1 = capt.captures[1] in2 = capt.captures[2] - compute_v = insert_node!(graph, make_node(ComputeTaskV()), false, false) + compute_v = + insert_node!(graph, make_node(ComputeTaskV()), false, false) data_out = insert_node!(graph, make_node(DataTask(5)), false, false) if (occursin(regex_c, in1)) # put an S node after this input - compute_S = insert_node!(graph, make_node(ComputeTaskS1()), false, false) - data_S_v = insert_node!(graph, make_node(DataTask(5)), false, false) + compute_S = insert_node!( + graph, + make_node(ComputeTaskS1()), + false, + false, + ) + data_S_v = + insert_node!(graph, make_node(DataTask(5)), false, false) insert_edge!(graph, dataOutNodes[in1], compute_S, false, false) insert_edge!(graph, compute_S, data_S_v, false, false) @@ -96,8 +117,14 @@ function parse_abc(filename::String, verbose::Bool = false) if (occursin(regex_c, in2)) # i think the current generator only puts the combined particles in the first space, so this case might never be entered # put an S node after this input - compute_S = insert_node!(graph, make_node(ComputeTaskS1()), false, false) - data_S_v = insert_node!(graph, make_node(DataTask(5)), false, false) + compute_S = insert_node!( + graph, + make_node(ComputeTaskS1()), + false, + false, + ) + data_S_v = + insert_node!(graph, make_node(DataTask(5)), false, false) insert_edge!(graph, dataOutNodes[in2], compute_S, false, false) insert_edge!(graph, compute_S, data_S_v, false, false) @@ -106,7 +133,7 @@ function parse_abc(filename::String, verbose::Bool = false) else insert_edge!(graph, dataOutNodes[in2], compute_v, false, false) end - + insert_edge!(graph, compute_v, data_out, false, false) dataOutNodes[node] = data_out @@ -118,7 +145,8 @@ function parse_abc(filename::String, verbose::Bool = false) in3 = capt.captures[3] # in2 + in3 with a v - compute_v = insert_node!(graph, make_node(ComputeTaskV()), false, false) + compute_v = + insert_node!(graph, make_node(ComputeTaskV()), false, false) data_v = insert_node!(graph, make_node(DataTask(5)), false, false) insert_edge!(graph, dataOutNodes[in2], compute_v, false, false) @@ -126,8 +154,10 @@ function parse_abc(filename::String, verbose::Bool = false) insert_edge!(graph, compute_v, data_v, false, false) # combine with the v of the combined other input - compute_S2 = insert_node!(graph, make_node(ComputeTaskS2()), false, false) - data_out = insert_node!(graph, make_node(DataTask(10)), false, false) + compute_S2 = + insert_node!(graph, make_node(ComputeTaskS2()), false, false) + data_out = + insert_node!(graph, make_node(DataTask(10)), false, false) insert_edge!(graph, data_v, compute_S2, false, false) insert_edge!(graph, dataOutNodes[in1], compute_S2, false, false) @@ -136,11 +166,13 @@ function parse_abc(filename::String, verbose::Bool = false) insert_edge!(graph, data_out, sum_node, false, false) elseif occursin(regex_plus, node) if (verbose) - println("\rReading Nodes Complete ") + println("\rReading Nodes Complete ") println("Added ", length(graph.nodes), " nodes") end else - @assert false ("Unknown node '$node' while reading from file $filename") + @assert false ( + "Unknown node '$node' while reading from file $filename" + ) end end diff --git a/src/abc_model/task_functions.jl b/src/models/abc/properties.jl similarity index 94% rename from src/abc_model/task_functions.jl rename to src/models/abc/properties.jl index 7a86b32..a4f6506 100644 --- a/src/abc_model/task_functions.jl +++ b/src/models/abc/properties.jl @@ -8,7 +8,7 @@ compute_effort(t::ComputeTaskP) = 15 compute_effort(t::ComputeTaskSum) = 1 function show(io::IO, t::DataTask) - print(io, "Data", t.data) + return print(io, "Data", t.data) end show(io::IO, t::ComputeTaskS1) = print("ComputeS1") diff --git a/src/models/abc/types.jl b/src/models/abc/types.jl new file mode 100644 index 0000000..5404b21 --- /dev/null +++ b/src/models/abc/types.jl @@ -0,0 +1,31 @@ +struct DataTask <: AbstractDataTask + data::UInt64 +end + +# S task with 1 child +struct ComputeTaskS1 <: AbstractComputeTask end + +# S task with 2 children +struct ComputeTaskS2 <: AbstractComputeTask end + +# P task with 0 children +struct ComputeTaskP <: AbstractComputeTask end + +# v task with 2 children +struct ComputeTaskV <: AbstractComputeTask end + +# u task with 1 child +struct ComputeTaskU <: AbstractComputeTask end + +# task that sums all its inputs, n children +struct ComputeTaskSum <: AbstractComputeTask end + +ABC_TASKS = [ + DataTask, + ComputeTaskS1, + ComputeTaskS2, + ComputeTaskP, + ComputeTaskV, + ComputeTaskU, + ComputeTaskSum, +] diff --git a/src/node/compare.jl b/src/node/compare.jl new file mode 100644 index 0000000..3743690 --- /dev/null +++ b/src/node/compare.jl @@ -0,0 +1,15 @@ +function ==(e1::Edge, e2::Edge) + return e1.edge[1] == e2.edge[1] && e1.edge[2] == e2.edge[2] +end + +function ==(n1::Node, n2::Node) + return false +end + +function ==(n1::ComputeTaskNode, n2::ComputeTaskNode) + return n1.id == n2.id +end + +function ==(n1::DataTaskNode, n2::DataTaskNode) + return n1.id == n2.id +end diff --git a/src/node/create.jl b/src/node/create.jl new file mode 100644 index 0000000..b13d990 --- /dev/null +++ b/src/node/create.jl @@ -0,0 +1,23 @@ +function make_node(t::AbstractTask) + return error("Cannot make a node from this task type") +end + +function make_node(t::AbstractDataTask) + return DataTaskNode(t) +end + +function make_node(t::AbstractComputeTask) + return ComputeTaskNode(t) +end + +function make_edge(n1::Node, n2::Node) + return error("Can only create edges from compute to data node or reverse") +end + +function make_edge(n1::ComputeTaskNode, n2::DataTaskNode) + return Edge((n1, n2)) +end + +function make_edge(n1::DataTaskNode, n2::ComputeTaskNode) + return Edge((n1, n2)) +end diff --git a/src/node/print.jl b/src/node/print.jl new file mode 100644 index 0000000..debaa88 --- /dev/null +++ b/src/node/print.jl @@ -0,0 +1,7 @@ +function show(io::IO, n::Node) + return print(io, "Node(", n.task, ")") +end + +function show(io::IO, e::Edge) + return print(io, "Edge(", e.edge[1], ", ", e.edge[2], ")") +end diff --git a/src/node/properties.jl b/src/node/properties.jl new file mode 100644 index 0000000..abbe252 --- /dev/null +++ b/src/node/properties.jl @@ -0,0 +1,52 @@ +is_entry_node(node::Node) = length(node.children) == 0 +is_exit_node(node::Node) = length(node.parents) == 0 + +# children = prerequisite nodes, nodes that need to execute before the task, edges point into this task +function children(node::Node) + return copy(node.children) +end + +# parents = subsequent nodes, nodes that need this node to execute, edges point from this task +function parents(node::Node) + return copy(node.parents) +end + +# siblings = all children of any parents, no duplicates, includes the node itself +function siblings(node::Node) + result = Set{Node}() + push!(result, node) + for parent in node.parents + union!(result, parent.children) + end + + return result +end + +# partners = all parents of any children, no duplicates, includes the node itself +function partners(node::Node) + result = Set{Node}() + push!(result, node) + for child in node.children + union!(result, child.parents) + end + + return result +end + +# alternative version to partners(Node), avoiding allocation of a new set +# works on the given set and returns nothing +function partners(node::Node, set::Set{Node}) + push!(set, node) + for child in node.children + union!(set, child.parents) + end + return nothing +end + +function is_parent(potential_parent, node) + return potential_parent in node.parents +end + +function is_child(potential_child, node) + return potential_child in node.children +end diff --git a/src/node/type.jl b/src/node/type.jl new file mode 100644 index 0000000..45ff267 --- /dev/null +++ b/src/node/type.jl @@ -0,0 +1,95 @@ +using Random +using UUIDs +using Base.Threads + +# TODO: reliably find out how many threads we're running with (nthreads() returns 1 when precompiling :/) +rng = [Random.MersenneTwister(0) for _ in 1:32] + +abstract type Node end + +# declare this type here because it's needed +# the specific operations are declared in graph.jl +abstract type Operation end + +mutable struct DataTaskNode <: Node + task::AbstractDataTask + + # use vectors as sets have way too much memory overhead + parents::Vector{Node} + children::Vector{Node} + + # need a unique identifier unique to every *constructed* node + # however, it can be copied when splitting a node + id::Base.UUID + + # the NodeReduction involving this node, if it exists + # Can't use the NodeReduction type here because it's not yet defined + nodeReduction::Union{Operation, Missing} + + # the NodeSplit involving this node, if it exists + nodeSplit::Union{Operation, Missing} + + # the node fusion involving this node, if it exists + nodeFusion::Union{Operation, Missing} +end + +# same as DataTaskNode +mutable struct ComputeTaskNode <: Node + task::AbstractComputeTask + parents::Vector{Node} + children::Vector{Node} + id::Base.UUID + + nodeReduction::Union{Operation, Missing} + nodeSplit::Union{Operation, Missing} + + # for ComputeTasks there can be multiple fusions, unlike the DataTasks + nodeFusions::Vector{Operation} +end + +DataTaskNode(t::AbstractDataTask) = DataTaskNode( + t, + Vector{Node}(), + Vector{Node}(), + UUIDs.uuid1(rng[threadid()]), + missing, + missing, + missing, +) +ComputeTaskNode(t::AbstractComputeTask) = ComputeTaskNode( + t, + Vector{Node}(), + Vector{Node}(), + UUIDs.uuid1(rng[threadid()]), + missing, + missing, + Vector{NodeFusion}(), +) + +struct Edge + # edge points from child to parent + edge::Union{ + Tuple{DataTaskNode, ComputeTaskNode}, + Tuple{ComputeTaskNode, DataTaskNode}, + } +end + +copy(m::Missing) = missing +copy(n::ComputeTaskNode) = ComputeTaskNode( + copy(n.task), + copy(n.parents), + copy(n.children), + UUIDs.uuid1(rng[threadid()]), + copy(n.nodeReduction), + copy(n.nodeSplit), + copy(n.nodeFusions), +) +copy(n::DataTaskNode) = DataTaskNode( + copy(n.task), + copy(n.parents), + copy(n.children), + UUIDs.uuid1(rng[threadid()]), + copy(n.nodeReduction), + copy(n.nodeSplit), + copy(n.nodeFusion), +) diff --git a/src/node/validate.jl b/src/node/validate.jl new file mode 100644 index 0000000..5cd1244 --- /dev/null +++ b/src/node/validate.jl @@ -0,0 +1,43 @@ +function is_valid_node(graph::DAG, node::Node) + @assert node in graph "Node is not part of the given graph!" + + for parent in node.parents + @assert typeof(parent) != typeof(node) "Node's type is the same as its parent's!" + @assert parent in graph "Node's parent is not in the same graph!" + @assert node in parent.children "Node is not a child of its parent!" + end + + for child in node.children + @assert typeof(child) != typeof(node) "Node's type is the same as its child's!" + @assert child in graph "Node's child is not in the same graph!" + @assert node in child.parents "Node is not a parent of its child!" + end + + if !ismissing(node.nodeReduction) + @assert is_valid(graph, node.nodeReduction) + end + if !ismissing(node.nodeSplit) + @assert is_valid(graph, node.nodeSplit) + end + return true +end + +# call with @assert +function is_valid(graph::DAG, node::ComputeTaskNode) + @assert is_valid_node(graph, node) + + for nf in node.nodeFusions + @assert is_valid(graph, nf) + end + return true +end + +# call with @assert +function is_valid(graph::DAG, node::DataTaskNode) + @assert is_valid_node(graph, node) + + if !ismissing(node.nodeFusion) + @assert is_valid(graph, node.nodeFusion) + end + return true +end diff --git a/src/node_functions.jl b/src/node_functions.jl deleted file mode 100644 index 1d53f54..0000000 --- a/src/node_functions.jl +++ /dev/null @@ -1,51 +0,0 @@ -function make_node(t::AbstractTask) - error("Cannot make a node from this task type") -end - -function make_node(t::AbstractDataTask) - return DataTaskNode(t) -end - -function make_node(t::AbstractComputeTask) - return ComputeTaskNode(t) -end - -function make_edge(n1::Node, n2::Node) - error("Can only create edges from compute to data node or reverse") -end - -function make_edge(n1::ComputeTaskNode, n2::DataTaskNode) - return Edge((n1, n2)) -end - -function make_edge(n1::DataTaskNode, n2::ComputeTaskNode) - return Edge((n1, n2)) -end - -function show(io::IO, n::Node) - print(io, "Node(", n.task, ")") -end - -function show(io::IO, e::Edge) - print(io, "Edge(", e.edge[1], ", ", e.edge[2], ")") -end - -function ==(e1::Edge, e2::Edge) - return e1.edge[1] == e2.edge[1] && e1.edge[2] == e2.edge[2] -end - -function ==(n1::Node, n2::Node) - return false -end - -function ==(n1::ComputeTaskNode, n2::ComputeTaskNode) - return n1.id == n2.id -end - -function ==(n1::DataTaskNode, n2::DataTaskNode) - return n1.id == n2.id -end - -copy(m::Missing) = missing -copy(n::ComputeTaskNode) = ComputeTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng[threadid()]), copy(n.nodeReduction), copy(n.nodeSplit), copy(n.nodeFusions)) -copy(n::DataTaskNode) = DataTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng[threadid()]), copy(n.nodeReduction), copy(n.nodeSplit), copy(n.nodeFusion)) diff --git a/src/nodes.jl b/src/nodes.jl deleted file mode 100644 index c1e6b80..0000000 --- a/src/nodes.jl +++ /dev/null @@ -1,56 +0,0 @@ -using Random -using UUIDs -using Base.Threads - -# TODO: reliably find out how many threads we're running with (nthreads() returns 1 when precompiling :/) -rng = [Random.MersenneTwister(0) for _ in 1:32] - -abstract type Node end - -# declare this type here because it's needed -# the specific operations are declared in graph.jl -abstract type Operation end - -mutable struct DataTaskNode <: Node - task::AbstractDataTask - - # use vectors as sets have way too much memory overhead - parents::Vector{Node} - children::Vector{Node} - - # need a unique identifier unique to every *constructed* node - # however, it can be copied when splitting a node - id::Base.UUID - - # the NodeReduction involving this node, if it exists - # Can't use the NodeReduction type here because it's not yet defined - nodeReduction::Union{Operation, Missing} - - # the NodeSplit involving this node, if it exists - nodeSplit::Union{Operation, Missing} - - # the node fusion involving this node, if it exists - nodeFusion::Union{Operation, Missing} -end - -# same as DataTaskNode -mutable struct ComputeTaskNode <: Node - task::AbstractComputeTask - parents::Vector{Node} - children::Vector{Node} - id::Base.UUID - - nodeReduction::Union{Operation, Missing} - nodeSplit::Union{Operation, Missing} - - # for ComputeTasks there can be multiple fusions, unlike the DataTasks - nodeFusions::Vector{Operation} -end - -DataTaskNode(t::AbstractDataTask) = DataTaskNode(t, Vector{Node}(), Vector{Node}(), UUIDs.uuid1(rng[threadid()]), missing, missing, missing) -ComputeTaskNode(t::AbstractComputeTask) = ComputeTaskNode(t, Vector{Node}(), Vector{Node}(), UUIDs.uuid1(rng[threadid()]), missing, missing, Vector{NodeFusion}()) - -struct Edge - # edge points from child to parent - edge::Union{Tuple{DataTaskNode, ComputeTaskNode}, Tuple{ComputeTaskNode, DataTaskNode}} -end diff --git a/src/operations/apply.jl b/src/operation/apply.jl similarity index 92% rename from src/operations/apply.jl rename to src/operation/apply.jl index 9be20ad..f61953a 100644 --- a/src/operations/apply.jl +++ b/src/operation/apply.jl @@ -16,11 +16,16 @@ function apply_all!(graph::DAG) end function apply_operation!(graph::DAG, operation::Operation) - error("Unknown operation type!") + return error("Unknown operation type!") end function apply_operation!(graph::DAG, operation::NodeFusion) - diff = node_fusion!(graph, operation.input[1], operation.input[2], operation.input[3]) + diff = node_fusion!( + graph, + operation.input[1], + operation.input[2], + operation.input[3], + ) return AppliedNodeFusion(operation, diff) end @@ -36,7 +41,7 @@ end function revert_operation!(graph::DAG, operation::AppliedOperation) - error("Unknown operation type!") + return error("Unknown operation type!") end function revert_operation!(graph::DAG, operation::AppliedNodeFusion) @@ -74,7 +79,12 @@ function revert_diff!(graph::DAG, diff::Diff) end # Fuse nodes n1 -> n2 -> n3 together into one node, return the applied difference to the graph -function node_fusion!(graph::DAG, n1::ComputeTaskNode, n2::DataTaskNode, n3::ComputeTaskNode) +function node_fusion!( + graph::DAG, + n1::ComputeTaskNode, + n2::DataTaskNode, + n3::ComputeTaskNode, +) # @assert is_valid_node_fusion_input(graph, n1, n2, n3) # clear snapshot @@ -97,7 +107,8 @@ function node_fusion!(graph::DAG, n1::ComputeTaskNode, n2::DataTaskNode, n3::Com remove_node!(graph, n3) # create new node with the fused compute task - new_node = ComputeTaskNode(FusedComputeTask{typeof(n1.task),typeof(n3.task)}()) + new_node = + ComputeTaskNode(FusedComputeTask{typeof(n1.task), typeof(n3.task)}()) insert_node!(graph, new_node) # use a set for combined children of n1 and n3 to not get duplicates @@ -136,7 +147,7 @@ function node_reduction!(graph::DAG, nodes::Vector{Node}) n1 = nodes[1] n1_children = children(n1) - + n1_parents = Set(n1.parents) new_parents = Set{Node}() diff --git a/src/operation/clean.jl b/src/operation/clean.jl new file mode 100644 index 0000000..142e21a --- /dev/null +++ b/src/operation/clean.jl @@ -0,0 +1,115 @@ +# functions for "cleaning" nodes, i.e. regenerating the possible operations for a node + +# function to find node fusions involving the given node if it's a data node +# pushes the found fusion everywhere it needs to be and returns nothing +function find_fusions!(graph::DAG, node::DataTaskNode) + # if there is already a fusion here, skip + if !ismissing(node.nodeFusion) + return nothing + end + + if length(node.parents) != 1 || length(node.children) != 1 + return nothing + end + + child_node = first(node.children) + parent_node = first(node.parents) + + if !(child_node in graph) || !(parent_node in graph) + error("Parents/Children that are not in the graph!!!") + end + + if length(child_node.parents) != 1 + return nothing + end + + nf = NodeFusion((child_node, node, parent_node)) + push!(graph.possibleOperations.nodeFusions, nf) + push!(child_node.nodeFusions, nf) + node.nodeFusion = nf + push!(parent_node.nodeFusions, nf) + + return nothing +end + + +function find_fusions!(graph::DAG, node::ComputeTaskNode) + # just find fusions in neighbouring DataTaskNodes + for child in node.children + find_fusions!(graph, child) + end + + for parent in node.parents + find_fusions!(graph, parent) + end + + return nothing +end + +function find_reductions!(graph::DAG, node::Node) + # there can only be one reduction per node, avoid adding duplicates + if !ismissing(node.nodeReduction) + return nothing + end + + reductionVector = nothing + # possible reductions are with nodes that are partners, i.e. parents of children + partners_ = partners(node) + delete!(partners_, node) + for partner in partners_ + if partner ∉ graph.nodes + error("Partner is not part of the graph") + end + + if can_reduce(node, partner) + if Set(node.children) != Set(partner.children) + error("Not equal children") + end + if reductionVector === nothing + # only when there's at least one reduction partner, insert the vector + reductionVector = Vector{Node}() + push!(reductionVector, node) + end + + push!(reductionVector, partner) + end + end + + if reductionVector !== nothing + nr = NodeReduction(reductionVector) + push!(graph.possibleOperations.nodeReductions, nr) + for node in reductionVector + if !ismissing(node.nodeReduction) + # it can happen that the dirty node becomes part of an existing NodeReduction and overrides those ones now + # this is only a problem insofar the existing NodeReduction has to be deleted and replaced also in the possibleOperations + invalidate_caches!(graph, node.nodeReduction) + end + node.nodeReduction = nr + end + end + + return nothing +end + +function find_splits!(graph::DAG, node::Node) + if !ismissing(node.nodeSplit) + return nothing + end + + if (can_split(node)) + ns = NodeSplit(node) + push!(graph.possibleOperations.nodeSplits, ns) + node.nodeSplit = ns + end + + return nothing +end + +# "clean" the operations on a dirty node +function clean_node!(graph::DAG, node::Node) + sort_node!(node) + + find_fusions!(graph, node) + find_reductions!(graph, node) + return find_splits!(graph, node) +end diff --git a/src/operation/find.jl b/src/operation/find.jl new file mode 100644 index 0000000..e851e21 --- /dev/null +++ b/src/operation/find.jl @@ -0,0 +1,228 @@ +# functions that find operations on the inital graph + +using Base.Threads + +function insert_operation!( + nf::NodeFusion, + locks::Dict{ComputeTaskNode, SpinLock}, +) + n1 = nf.input[1] + n2 = nf.input[2] + n3 = nf.input[3] + + lock(locks[n1]) do + return push!(nf.input[1].nodeFusions, nf) + end + n2.nodeFusion = nf + lock(locks[n3]) do + return push!(nf.input[3].nodeFusions, nf) + end + return nothing +end + +function insert_operation!(nr::NodeReduction) + for n in nr.input + n.nodeReduction = nr + end + return nothing +end + +function insert_operation!(ns::NodeSplit) + ns.input.nodeSplit = ns + return nothing +end + +function nr_insertion!( + operations::PossibleOperations, + nodeReductions::Vector{Vector{NodeReduction}}, +) + total_len = 0 + for vec in nodeReductions + total_len += length(vec) + end + sizehint!(operations.nodeReductions, total_len) + + t = @task for vec in nodeReductions + union!(operations.nodeReductions, Set(vec)) + end + schedule(t) + + @threads for vec in nodeReductions + for op in vec + insert_operation!(op) + end + end + + wait(t) + + return nothing +end + +function nf_insertion!( + graph::DAG, + operations::PossibleOperations, + nodeFusions::Vector{Vector{NodeFusion}}, +) + total_len = 0 + for vec in nodeFusions + total_len += length(vec) + end + sizehint!(operations.nodeFusions, total_len) + + t = @task for vec in nodeFusions + union!(operations.nodeFusions, Set(vec)) + end + schedule(t) + + locks = Dict{ComputeTaskNode, SpinLock}() + for n in graph.nodes + if (typeof(n) <: ComputeTaskNode) + locks[n] = SpinLock() + end + end + + @threads for vec in nodeFusions + for op in vec + insert_operation!(op, locks) + end + end + + wait(t) + + return nothing +end + +function ns_insertion!( + operations::PossibleOperations, + nodeSplits::Vector{Vector{NodeSplit}}, +) + total_len = 0 + for vec in nodeSplits + total_len += length(vec) + end + sizehint!(operations.nodeSplits, total_len) + + t = @task for vec in nodeSplits + union!(operations.nodeSplits, Set(vec)) + end + schedule(t) + + @threads for vec in nodeSplits + for op in vec + insert_operation!(op) + end + end + + wait(t) + + return nothing +end + +# function to generate all possible operations on the graph +function generate_options(graph::DAG) + generatedFusions = [Vector{NodeFusion}() for _ in 1:nthreads()] + generatedReductions = [Vector{NodeReduction}() for _ in 1:nthreads()] + generatedSplits = [Vector{NodeSplit}() for _ in 1:nthreads()] + + # make sure the graph is fully generated through + apply_all!(graph) + + nodeArray = collect(graph.nodes) + + # sort all nodes + @threads for node in nodeArray + sort_node!(node) + end + + checkedNodes = Set{Node}() + checkedNodesLock = SpinLock() + # --- find possible node reductions --- + @threads for node in nodeArray + # we're looking for nodes with multiple parents, those parents can then potentially reduce with one another + if (length(node.parents) <= 1) + continue + end + + candidates = node.parents + + # sort into equivalence classes + trie = NodeTrie() + + for candidate in candidates + # insert into trie + insert!(trie, candidate) + end + + nodeReductions = collect(trie) + + for nrVec in nodeReductions + # parent sets are ordered and any node can only be part of one nodeReduction, so a NodeReduction is uniquely identifiable by its first element + # this prevents duplicate nodeReductions being generated + lock(checkedNodesLock) + if (nrVec[1] in checkedNodes) + unlock(checkedNodesLock) + continue + else + push!(checkedNodes, nrVec[1]) + end + unlock(checkedNodesLock) + + push!(generatedReductions[threadid()], NodeReduction(nrVec)) + end + end + + + # launch thread for node reduction insertion + # remove duplicates + nr_task = @task nr_insertion!(graph.possibleOperations, generatedReductions) + schedule(nr_task) + + # --- find possible node fusions --- + @threads for node in nodeArray + if (typeof(node) <: DataTaskNode) + if length(node.parents) != 1 + # data node can only have a single parent + continue + end + parent_node = first(node.parents) + + if length(node.children) != 1 + # this node is an entry node or has multiple children which should not be possible + continue + end + child_node = first(node.children) + if (length(child_node.parents) != 1) + continue + end + + push!( + generatedFusions[threadid()], + NodeFusion((child_node, node, parent_node)), + ) + end + end + + # launch thread for node fusion insertion + nf_task = + @task nf_insertion!(graph, graph.possibleOperations, generatedFusions) + schedule(nf_task) + + # find possible node splits + @threads for node in nodeArray + if (can_split(node)) + push!(generatedSplits[threadid()], NodeSplit(node)) + end + end + + # launch thread for node split insertion + ns_task = @task ns_insertion!(graph.possibleOperations, generatedSplits) + schedule(ns_task) + + empty!(graph.dirtyNodes) + + wait(nr_task) + wait(nf_task) + wait(ns_task) + + return nothing +end diff --git a/src/operation/get.jl b/src/operation/get.jl new file mode 100644 index 0000000..b527fb1 --- /dev/null +++ b/src/operation/get.jl @@ -0,0 +1,18 @@ +# function to return the possible operations of a graph + +using Base.Threads + +function get_operations(graph::DAG) + apply_all!(graph) + + if isempty(graph.possibleOperations) + generate_options(graph) + end + + for node in graph.dirtyNodes + clean_node!(graph, node) + end + empty!(graph.dirtyNodes) + + return graph.possibleOperations +end diff --git a/src/operations/print.jl b/src/operation/print.jl similarity index 88% rename from src/operations/print.jl rename to src/operation/print.jl index 742afb9..39f5e8c 100644 --- a/src/operations/print.jl +++ b/src/operation/print.jl @@ -20,12 +20,12 @@ function show(io::IO, op::NodeReduction) print(io, "NR: ") print(io, length(op.input)) print(io, "x") - print(io, op.input[1].task) + return print(io, op.input[1].task) end function show(io::IO, op::NodeSplit) print(io, "NS: ") - print(io, op.input.task) + return print(io, op.input.task) end function show(io::IO, op::NodeFusion) @@ -34,5 +34,5 @@ function show(io::IO, op::NodeFusion) print(io, "->") print(io, op.input[2].task) print(io, "->") - print(io, op.input[3].task) + return print(io, op.input[3].task) end diff --git a/src/operation/type.jl b/src/operation/type.jl new file mode 100644 index 0000000..c9c68b7 --- /dev/null +++ b/src/operation/type.jl @@ -0,0 +1,34 @@ +# An abstract base class for operations +# an operation can be applied to a DAG +abstract type Operation end + +# An abstract base class for already applied operations +# an applied operation can be reversed iff it is the last applied operation on the DAG +abstract type AppliedOperation end + +struct NodeFusion <: Operation + input::Tuple{ComputeTaskNode, DataTaskNode, ComputeTaskNode} +end + +struct AppliedNodeFusion <: AppliedOperation + operation::NodeFusion + diff::Diff +end + +struct NodeReduction <: Operation + input::Vector{Node} +end + +struct AppliedNodeReduction <: AppliedOperation + operation::NodeReduction + diff::Diff +end + +struct NodeSplit <: Operation + input::Node +end + +struct AppliedNodeSplit <: AppliedOperation + operation::NodeSplit + diff::Diff +end diff --git a/src/operation/utility.jl b/src/operation/utility.jl new file mode 100644 index 0000000..97297b0 --- /dev/null +++ b/src/operation/utility.jl @@ -0,0 +1,111 @@ + +function isempty(operations::PossibleOperations) + return isempty(operations.nodeFusions) && + isempty(operations.nodeReductions) && + isempty(operations.nodeSplits) +end + +function length(operations::PossibleOperations) + return ( + nodeFusions = length(operations.nodeFusions), + nodeReductions = length(operations.nodeReductions), + nodeSplits = length(operations.nodeSplits), + ) +end + +function delete!(operations::PossibleOperations, op::NodeFusion) + delete!(operations.nodeFusions, op) + return operations +end + +function delete!(operations::PossibleOperations, op::NodeReduction) + delete!(operations.nodeReductions, op) + return operations +end + +function delete!(operations::PossibleOperations, op::NodeSplit) + delete!(operations.nodeSplits, op) + return operations +end + + +function can_fuse(n1::ComputeTaskNode, n2::DataTaskNode, n3::ComputeTaskNode) + if !is_child(n1, n2) || !is_child(n2, n3) + # the checks are redundant but maybe a good sanity check + return false + end + + if length(n2.parents) != 1 || + length(n2.children) != 1 || + length(n1.parents) != 1 + return false + end + + return true +end + +function can_reduce(n1::Node, n2::Node) + if (n1.task != n2.task) + return false + end + + n1_length = length(n1.children) + n2_length = length(n2.children) + + if (n1_length != n2_length) + return false + end + + # this seems to be the most common case so do this first + # doing it manually is a lot faster than using the sets for a general solution + if (n1_length == 2) + if (n1.children[1] != n2.children[1]) + if (n1.children[1] != n2.children[2]) + return false + end + # 1_1 == 2_2 + if (n1.children[2] != n2.children[1]) + return false + end + return true + end + + # 1_1 == 2_1 + if (n1.children[2] != n2.children[2]) + return false + end + return true + end + + # this is simple + if (n1_length == 1) + return n1.children[1] == n2.children[1] + end + + # this takes a long time + return Set(n1.children) == Set(n2.children) +end + +function can_split(n::Node) + return length(parents(n)) > 1 +end + +function ==(op1::Operation, op2::Operation) + return false +end + +function ==(op1::NodeFusion, op2::NodeFusion) + # there can only be one node fusion on a given data task, so if the data task is the same, the fusion is the same + return op1.input[2] == op2.input[2] +end + +function ==(op1::NodeReduction, op2::NodeReduction) + # node reductions are equal exactly if their first input is the same + return op1.input[1].id == op2.input[1].id +end + +function ==(op1::NodeSplit, op2::NodeSplit) + return op1.input == op2.input +end + +copy(id::UUID) = UUID(id.value) diff --git a/src/operation/validate.jl b/src/operation/validate.jl new file mode 100644 index 0000000..5b8dca5 --- /dev/null +++ b/src/operation/validate.jl @@ -0,0 +1,132 @@ +# functions to throw assertion errors for inconsistent or wrong node operations +# should be called with @assert +# the functions throw their own errors though, to still have helpful error messages + +function is_valid_node_fusion_input( + graph::DAG, + n1::ComputeTaskNode, + n2::DataTaskNode, + n3::ComputeTaskNode, +) + if !(n1 in graph) || !(n2 in graph) || !(n3 in graph) + throw( + AssertionError( + "[Node Fusion] The given nodes are not part of the given graph", + ), + ) + end + + if !is_child(n1, n2) || + !is_child(n2, n3) || + !is_parent(n3, n2) || + !is_parent(n2, n1) + throw( + AssertionError( + "[Node Fusion] The given nodes are not connected by edges which is required for node fusion", + ), + ) + end + + if length(n2.parents) > 1 + throw( + AssertionError( + "[Node Fusion] The given data node has more than one parent", + ), + ) + end + if length(n2.children) > 1 + throw( + AssertionError( + "[Node Fusion] The given data node has more than one child", + ), + ) + end + if length(n1.parents) > 1 + throw( + AssertionError( + "[Node Fusion] The given n1 has more than one parent", + ), + ) + end + + return true +end + +function is_valid_node_reduction_input(graph::DAG, nodes::Vector{Node}) + for n in nodes + if n ∉ graph + throw( + AssertionError( + "[Node Reduction] The given nodes are not part of the given graph", + ), + ) + end + end + + t = typeof(nodes[1].task) + for n in nodes + if typeof(n.task) != t + throw( + AssertionError( + "[Node Reduction] The given nodes are not of the same type", + ), + ) + end + end + + n1_children = nodes[1].children + for n in nodes + if Set(n1_children) != Set(n.children) + throw( + AssertionError( + "[Node Reduction] The given nodes do not have equal prerequisite nodes which is required for node reduction", + ), + ) + end + end + + return true +end + +function is_valid_node_split_input(graph::DAG, n1::Node) + if n1 ∉ graph + throw( + AssertionError( + "[Node Split] The given node is not part of the given graph", + ), + ) + end + + if length(n1.parents) <= 1 + throw( + AssertionError( + "[Node Split] The given node does not have multiple parents which is required for node split", + ), + ) + end + + return true +end + +function is_valid(graph::DAG, nr::NodeReduction) + @assert is_valid_node_reduction_input(graph, nr.input) + @assert nr in graph.possibleOperations.nodeReductions "NodeReduction is not part of the graph's possible operations!" + return true +end + +function is_valid(graph::DAG, ns::NodeSplit) + @assert is_valid_node_split_input(graph, ns.input) + @assert ns in graph.possibleOperations.nodeSplits "NodeSplit is not part of the graph's possible operations!" + return true +end + +function is_valid(graph::DAG, nf::NodeFusion) + @assert is_valid_node_fusion_input( + graph, + nf.input[1], + nf.input[2], + nf.input[3], + ) + @assert nf in graph.possibleOperations.nodeFusions "NodeFusion is not part of the graph's possible operations!" + return true +end diff --git a/src/operations/clean.jl b/src/operations/clean.jl deleted file mode 100644 index 902012d..0000000 --- a/src/operations/clean.jl +++ /dev/null @@ -1,115 +0,0 @@ -# functions for "cleaning" nodes, i.e. regenerating the possible operations for a node - -# function to find node fusions involving the given node if it's a data node -# pushes the found fusion everywhere it needs to be and returns nothing -function find_fusions!(graph::DAG, node::DataTaskNode) - # if there is already a fusion here, skip - if !ismissing(node.nodeFusion) - return nothing - end - - if length(node.parents) != 1 || length(node.children) != 1 - return nothing - end - - child_node = first(node.children) - parent_node = first(node.parents) - - if !(child_node in graph) || !(parent_node in graph) - error("Parents/Children that are not in the graph!!!") - end - - if length(child_node.parents) != 1 - return nothing - end - - nf = NodeFusion((child_node, node, parent_node)) - push!(graph.possibleOperations.nodeFusions, nf) - push!(child_node.nodeFusions, nf) - node.nodeFusion = nf - push!(parent_node.nodeFusions, nf) - - return nothing -end - - -function find_fusions!(graph::DAG, node::ComputeTaskNode) - # just find fusions in neighbouring DataTaskNodes - for child in node.children - find_fusions!(graph, child) - end - - for parent in node.parents - find_fusions!(graph, parent) - end - - return nothing -end - -function find_reductions!(graph::DAG, node::Node) - # there can only be one reduction per node, avoid adding duplicates - if !ismissing(node.nodeReduction) - return nothing - end - - reductionVector = nothing - # possible reductions are with nodes that are partners, i.e. parents of children - partners_ = partners(node) - delete!(partners_, node) - for partner in partners_ - if partner ∉ graph.nodes - error("Partner is not part of the graph") - end - - if can_reduce(node, partner) - if Set(node.children) != Set(partner.children) - error("Not equal children") - end - if reductionVector === nothing - # only when there's at least one reduction partner, insert the vector - reductionVector = Vector{Node}() - push!(reductionVector, node) - end - - push!(reductionVector, partner) - end - end - - if reductionVector !== nothing - nr = NodeReduction(reductionVector) - push!(graph.possibleOperations.nodeReductions, nr) - for node in reductionVector - if !ismissing(node.nodeReduction) - # it can happen that the dirty node becomes part of an existing NodeReduction and overrides those ones now - # this is only a problem insofar the existing NodeReduction has to be deleted and replaced also in the possibleOperations - invalidate_caches!(graph, node.nodeReduction) - end - node.nodeReduction = nr - end - end - - return nothing -end - -function find_splits!(graph::DAG, node::Node) - if !ismissing(node.nodeSplit) - return nothing - end - - if (can_split(node)) - ns = NodeSplit(node) - push!(graph.possibleOperations.nodeSplits, ns) - node.nodeSplit = ns - end - - return nothing -end - -# "clean" the operations on a dirty node -function clean_node!(graph::DAG, node::Node) - sort_node!(node) - - find_fusions!(graph, node) - find_reductions!(graph, node) - find_splits!(graph, node) -end diff --git a/src/operations/find.jl b/src/operations/find.jl deleted file mode 100644 index 1fa0624..0000000 --- a/src/operations/find.jl +++ /dev/null @@ -1,205 +0,0 @@ -# functions that find operations on the inital graph - -using Base.Threads - -function insert_operation!(nf::NodeFusion, locks::Dict{ComputeTaskNode, SpinLock}) - n1 = nf.input[1]; n2 = nf.input[2]; n3 = nf.input[3] - - lock(locks[n1]) do; push!(nf.input[1].nodeFusions, nf); end - nf.input[2].nodeFusion = nf - lock(locks[n3]) do; push!(nf.input[3].nodeFusions, nf); end - return nothing -end - -function insert_operation!(nr::NodeReduction) - for n in nr.input - n.nodeReduction = nr - end - return nothing -end - -function insert_operation!(ns::NodeSplit) - ns.input.nodeSplit = ns - return nothing -end - -function nr_insertion!(operations::PossibleOperations, nodeReductions::Vector{Vector{NodeReduction}}) - total_len = 0 - for vec in nodeReductions - total_len += length(vec) - end - sizehint!(operations.nodeReductions, total_len) - - t = @task for vec in nodeReductions - union!(operations.nodeReductions, Set(vec)) - end - schedule(t) - - @threads for vec in nodeReductions - for op in vec - insert_operation!(op) - end - end - - wait(t) - - return nothing -end - -function nf_insertion!(graph::DAG, operations::PossibleOperations, nodeFusions::Vector{Vector{NodeFusion}}) - total_len = 0 - for vec in nodeFusions - total_len += length(vec) - end - sizehint!(operations.nodeFusions, total_len) - - t = @task for vec in nodeFusions - union!(operations.nodeFusions, Set(vec)) - end - schedule(t) - - locks = Dict{ComputeTaskNode, SpinLock}() - for n in graph.nodes - if (typeof(n) <: ComputeTaskNode) - locks[n] = SpinLock() - end - end - - @threads for vec in nodeFusions - for op in vec - insert_operation!(op, locks) - end - end - - wait(t) - - return nothing -end - -function ns_insertion!(operations::PossibleOperations, nodeSplits::Vector{Vector{NodeSplit}}) - total_len = 0 - for vec in nodeSplits - total_len += length(vec) - end - sizehint!(operations.nodeSplits, total_len) - - t = @task for vec in nodeSplits - union!(operations.nodeSplits, Set(vec)) - end - schedule(t) - - @threads for vec in nodeSplits - for op in vec - insert_operation!(op) - end - end - - wait(t) - - return nothing -end - -# function to generate all possible operations on the graph -function generate_options(graph::DAG) - generatedFusions = [Vector{NodeFusion}() for _ in 1:nthreads()] - generatedReductions = [Vector{NodeReduction}() for _ in 1:nthreads()] - generatedSplits = [Vector{NodeSplit}() for _ in 1:nthreads()] - - # make sure the graph is fully generated through - apply_all!(graph) - - nodeArray = collect(graph.nodes) - - # sort all nodes - @threads for node in nodeArray - sort_node!(node) - end - - checkedNodes = Set{Node}() - checkedNodesLock = SpinLock() - # --- find possible node reductions --- - @threads for node in nodeArray - # we're looking for nodes with multiple parents, those parents can then potentially reduce with one another - if (length(node.parents) <= 1) - continue - end - - candidates = node.parents - - # sort into equivalence classes - trie = NodeTrie() - - for candidate in candidates - # insert into trie - insert!(trie, candidate) - end - - nodeReductions = collect(trie) - - for nrVec in nodeReductions - # parent sets are ordered and any node can only be part of one nodeReduction, so a NodeReduction is uniquely identifiable by its first element - # this prevents duplicate nodeReductions being generated - lock(checkedNodesLock) - if (nrVec[1] in checkedNodes) - unlock(checkedNodesLock) - continue - else - push!(checkedNodes, nrVec[1]) - end - unlock(checkedNodesLock) - - push!(generatedReductions[threadid()], NodeReduction(nrVec)) - end - end - - - # launch thread for node reduction insertion - # remove duplicates - nr_task = @task nr_insertion!(graph.possibleOperations, generatedReductions) - schedule(nr_task) - - # --- find possible node fusions --- - @threads for node in nodeArray - if (typeof(node) <: DataTaskNode) - if length(node.parents) != 1 - # data node can only have a single parent - continue - end - parent_node = first(node.parents) - - if length(node.children) != 1 - # this node is an entry node or has multiple children which should not be possible - continue - end - child_node = first(node.children) - if (length(child_node.parents) != 1) - continue - end - - push!(generatedFusions[threadid()], NodeFusion((child_node, node, parent_node))) - end - end - - # launch thread for node fusion insertion - nf_task = @task nf_insertion!(graph, graph.possibleOperations, generatedFusions) - schedule(nf_task) - - # find possible node splits - @threads for node in nodeArray - if (can_split(node)) - push!(generatedSplits[threadid()], NodeSplit(node)) - end - end - - # launch thread for node split insertion - ns_task = @task ns_insertion!(graph.possibleOperations, generatedSplits) - schedule(ns_task) - - empty!(graph.dirtyNodes) - - wait(nr_task) - wait(nf_task) - wait(ns_task) - - return nothing -end diff --git a/src/operations/get.jl b/src/operations/get.jl deleted file mode 100644 index 81ef1c0..0000000 --- a/src/operations/get.jl +++ /dev/null @@ -1,18 +0,0 @@ -# function to return the possible operations of a graph - -using Base.Threads - -function get_operations(graph::DAG) - apply_all!(graph) - - if isempty(graph.possibleOperations) - generate_options(graph) - end - - for node in graph.dirtyNodes - clean_node!(graph, node) - end - empty!(graph.dirtyNodes) - - return graph.possibleOperations -end diff --git a/src/operations/utility.jl b/src/operations/utility.jl deleted file mode 100644 index 6bda2fe..0000000 --- a/src/operations/utility.jl +++ /dev/null @@ -1,107 +0,0 @@ - -function isempty(operations::PossibleOperations) - return isempty(operations.nodeFusions) && - isempty(operations.nodeReductions) && - isempty(operations.nodeSplits) -end - -function length(operations::PossibleOperations) - return (nodeFusions = length(operations.nodeFusions), - nodeReductions = length(operations.nodeReductions), - nodeSplits = length(operations.nodeSplits)) -end - -function delete!(operations::PossibleOperations, op::NodeFusion) - delete!(operations.nodeFusions, op) - return operations -end - -function delete!(operations::PossibleOperations, op::NodeReduction) - delete!(operations.nodeReductions, op) - return operations -end - -function delete!(operations::PossibleOperations, op::NodeSplit) - delete!(operations.nodeSplits, op) - return operations -end - - -function can_fuse(n1::ComputeTaskNode, n2::DataTaskNode, n3::ComputeTaskNode) - if !is_child(n1, n2) || !is_child(n2, n3) - # the checks are redundant but maybe a good sanity check - return false - end - - if length(n2.parents) != 1 || length(n2.children) != 1 || length(n1.parents) != 1 - return false - end - - return true -end - -function can_reduce(n1::Node, n2::Node) - if (n1.task != n2.task) - return false - end - - n1_length = length(n1.children) - n2_length = length(n2.children) - - if (n1_length != n2_length) - return false - end - - # this seems to be the most common case so do this first - # doing it manually is a lot faster than using the sets for a general solution - if (n1_length == 2) - if (n1.children[1] != n2.children[1]) - if (n1.children[1] != n2.children[2]) - return false - end - # 1_1 == 2_2 - if (n1.children[2] != n2.children[1]) - return false - end - return true - end - - # 1_1 == 2_1 - if (n1.children[2] != n2.children[2]) - return false - end - return true - end - - # this is simple - if (n1_length == 1) - return n1.children[1] == n2.children[1] - end - - # this takes a long time - return Set(n1.children) == Set(n2.children) -end - -function can_split(n::Node) - return length(parents(n)) > 1 -end - -function ==(op1::Operation, op2::Operation) - return false -end - -function ==(op1::NodeFusion, op2::NodeFusion) - # there can only be one node fusion on a given data task, so if the data task is the same, the fusion is the same - return op1.input[2] == op2.input[2] -end - -function ==(op1::NodeReduction, op2::NodeReduction) - # node reductions are equal exactly if their first input is the same - return op1.input[1].id == op2.input[1].id -end - -function ==(op1::NodeSplit, op2::NodeSplit) - return op1.input == op2.input -end - -copy(id::UUID) = UUID(id.value) diff --git a/src/operations/validate.jl b/src/operations/validate.jl deleted file mode 100644 index 95f9929..0000000 --- a/src/operations/validate.jl +++ /dev/null @@ -1,61 +0,0 @@ -# functions to throw assertion errors for inconsistent or wrong node operations -# should be called with @assert -# the functions throw their own errors though, to still have helpful error messages - -function is_valid_node_fusion_input(graph::DAG, n1::ComputeTaskNode, n2::DataTaskNode, n3::ComputeTaskNode) - if !(n1 in graph) || !(n2 in graph) || !(n3 in graph) - throw(AssertionError("[Node Fusion] The given nodes are not part of the given graph")) - end - - if !is_child(n1, n2) || !is_child(n2, n3) || !is_parent(n3, n2) || !is_parent(n2, n1) - throw(AssertionError("[Node Fusion] The given nodes are not connected by edges which is required for node fusion")) - end - - if length(n2.parents) > 1 - throw(AssertionError("[Node Fusion] The given data node has more than one parent")) - end - if length(n2.children) > 1 - throw(AssertionError("[Node Fusion] The given data node has more than one child")) - end - if length(n1.parents) > 1 - throw(AssertionError("[Node Fusion] The given n1 has more than one parent")) - end - - return true -end - -function is_valid_node_reduction_input(graph::DAG, nodes::Vector{Node}) - for n in nodes - if n ∉ graph - throw(AssertionError("[Node Reduction] The given nodes are not part of the given graph")) - end - end - - t = typeof(nodes[1].task) - for n in nodes - if typeof(n.task) != t - throw(AssertionError("[Node Reduction] The given nodes are not of the same type")) - end - end - - n1_children = nodes[1].children - for n in nodes - if Set(n1_children) != Set(n.children) - throw(AssertionError("[Node Reduction] The given nodes do not have equal prerequisite nodes which is required for node reduction")) - end - end - - return true -end - -function is_valid_node_split_input(graph::DAG, n1::Node) - if n1 ∉ graph - throw(AssertionError("[Node Split] The given node is not part of the given graph")) - end - - if length(n1.parents) <= 1 - throw(AssertionError("[Node Split] The given node does not have multiple parents which is required for node split")) - end - - return true -end diff --git a/src/task/compare.jl b/src/task/compare.jl new file mode 100644 index 0000000..cb31f94 --- /dev/null +++ b/src/task/compare.jl @@ -0,0 +1,11 @@ +function ==(t1::AbstractTask, t2::AbstractTask) + return false +end + +function ==(t1::AbstractComputeTask, t2::AbstractComputeTask) + return typeof(t1) == typeof(t2) +end + +function ==(t1::AbstractDataTask, t2::AbstractDataTask) + return data(t1) == data(t2) +end diff --git a/src/task/print.jl b/src/task/print.jl new file mode 100644 index 0000000..3848e4c --- /dev/null +++ b/src/task/print.jl @@ -0,0 +1,4 @@ +function show(io::IO, t::FusedComputeTask) + (T1, T2) = get_types(t) + return print(io, "ComputeFuse(", T1(), ", ", T2(), ")") +end diff --git a/src/task/properties.jl b/src/task/properties.jl new file mode 100644 index 0000000..5b6defc --- /dev/null +++ b/src/task/properties.jl @@ -0,0 +1,33 @@ +function compute(t::AbstractTask; data...) + return error("Need to implement compute()") +end + +function compute_effort(t::AbstractTask) + # default implementation using compute + return error("Need to implement compute_effort()") +end + +function data(t::AbstractTask) + return error("Need to implement data()") +end + +compute_effort(t::AbstractDataTask) = 0 +compute(t::AbstractDataTask; data...) = data +data(t::AbstractDataTask) = getfield(t, :data) + +data(t::AbstractComputeTask) = 0 + +function compute_effort(t::FusedComputeTask) + (T1, T2) = collect(typeof(t).parameters) + return compute_effort(T1()) + compute_effort(T2()) +end + +# actual compute functions for the tasks can stay undefined for now +# compute(t::ComputeTaskU, data::Any) = mycomputation(data) + +function compute_intensity(t::AbstractTask)::UInt64 + if data(t) == 0 + return typemax(UInt64) + end + return compute_effort(t) / data(t) +end diff --git a/src/task/type.jl b/src/task/type.jl new file mode 100644 index 0000000..c9e6f1b --- /dev/null +++ b/src/task/type.jl @@ -0,0 +1,13 @@ +abstract type AbstractTask end + +abstract type AbstractComputeTask <: AbstractTask end +abstract type AbstractDataTask <: AbstractTask end + +struct FusedComputeTask{T1 <: AbstractComputeTask, T2 <: AbstractComputeTask} <: + AbstractComputeTask end + +get_types(::FusedComputeTask{T1, T2}) where {T1, T2} = (T1, T2) + +copy(t::AbstractDataTask) = + error("Need to implement copying for your data tasks!") +copy(t::AbstractComputeTask) = typeof(t)() diff --git a/src/task_functions.jl b/src/task_functions.jl deleted file mode 100644 index d0a371e..0000000 --- a/src/task_functions.jl +++ /dev/null @@ -1,53 +0,0 @@ -function compute(t::AbstractTask; data...) - error("Need to implement compute()") -end - -function compute_effort(t::AbstractTask) - # default implementation using compute - error("Need to implement compute_effort()") -end - -function data(t::AbstractTask) - error("Need to implement data()") -end - -compute_effort(t::AbstractDataTask) = 0 -compute(t::AbstractDataTask; data...) = data -data(t::AbstractDataTask) = getfield(t, :data) - -data(t::AbstractComputeTask) = 0 - -function compute_effort(t::FusedComputeTask) - (T1, T2) = collect(typeof(t).parameters) - return compute_effort(T1()) + compute_effort(T2()) -end - -# actual compute functions for the tasks can stay undefined for now -# compute(t::ComputeTaskU, data::Any) = mycomputation(data) - -function compute_intensity(t::AbstractTask)::UInt64 - if data(t) == 0 - return typemax(UInt64) - end - return compute_effort(t) / data(t) -end - -function show(io::IO, t::FusedComputeTask) - (T1, T2) = get_types(t) - print(io, "ComputeFuse(", T1(), ", ", T2(), ")") -end - -function ==(t1::AbstractTask, t2::AbstractTask) - return false -end - -function ==(t1::AbstractComputeTask, t2::AbstractComputeTask) - return typeof(t1) == typeof(t2) -end - -function ==(t1::AbstractDataTask, t2::AbstractDataTask) - return data(t1) == data(t2) -end - -copy(t::AbstractDataTask) = error("Need to implement copying for your data tasks!") -copy(t::AbstractComputeTask) = typeof(t)() diff --git a/src/tasks.jl b/src/tasks.jl deleted file mode 100644 index da9d7f9..0000000 --- a/src/tasks.jl +++ /dev/null @@ -1,9 +0,0 @@ -abstract type AbstractTask end - -abstract type AbstractComputeTask <: AbstractTask end -abstract type AbstractDataTask <: AbstractTask end - -struct FusedComputeTask{T1<:AbstractComputeTask, T2<:AbstractComputeTask} <: AbstractComputeTask -end - -get_types(::FusedComputeTask{T1, T2}) where {T1, T2} = (T1, T2) diff --git a/src/trie.jl b/src/trie.jl index 7b7944a..77ff648 100644 --- a/src/trie.jl +++ b/src/trie.jl @@ -26,14 +26,14 @@ function insert_helper!(trie::NodeIdTrie, node::Node, depth::Int) push!(trie.value, node) return nothing end - + depth = depth + 1 id = node.children[depth].id - + if (!haskey(trie.children, id)) trie.children[id] = NodeIdTrie() end - insert_helper!(trie.children[id], node, depth) + return insert_helper!(trie.children[id], node, depth) end function insert!(trie::NodeTrie, node::Node) @@ -41,7 +41,7 @@ function insert!(trie::NodeTrie, node::Node) if (!haskey(trie.children, t)) trie.children[t] = NodeIdTrie() end - insert_helper!(trie.children[typeof(node.task)], node, 0) + return insert_helper!(trie.children[typeof(node.task)], node, 0) end function collect_helper(trie::NodeIdTrie, acc::Set{Vector{Node}}) @@ -49,7 +49,7 @@ function collect_helper(trie::NodeIdTrie, acc::Set{Vector{Node}}) push!(acc, trie.value) end - for (id,child) in trie.children + for (id, child) in trie.children collect_helper(child, acc) end return nothing @@ -58,7 +58,7 @@ end # returns all sets of multiple nodes that have accumulated in leaves function collect(trie::NodeTrie) acc = Set{Vector{Node}}() - for (t,child) in trie.children + for (t, child) in trie.children collect_helper(child, acc) end return acc diff --git a/src/utility.jl b/src/utility.jl index a163361..96ad4a3 100644 --- a/src/utility.jl +++ b/src/utility.jl @@ -1,11 +1,11 @@ -function bytes_to_human_readable(bytes::Int64) +function bytes_to_human_readable(bytes) units = ["B", "KiB", "MiB", "GiB", "TiB"] unit_index = 1 while bytes >= 1024 && unit_index < length(units) bytes /= 1024 unit_index += 1 end - return string(round(bytes, sigdigits=4), " ", units[unit_index]) + return string(round(bytes, sigdigits = 4), " ", units[unit_index]) end function lt_nodes(n1::Node, n2::Node) @@ -13,6 +13,41 @@ function lt_nodes(n1::Node, n2::Node) end function sort_node!(node::Node) - sort!(node.children, lt=lt_nodes) - sort!(node.parents, lt=lt_nodes) + sort!(node.children, lt = lt_nodes) + return sort!(node.parents, lt = lt_nodes) +end + +function mem(graph::DAG) + size = 0 + size += Base.summarysize(graph.nodes, exclude = Union{Node}) + for n in graph.nodes + size += mem(n) + end + + size += sizeof(graph.appliedOperations) + size += sizeof(graph.operationsToApply) + + size += sizeof(graph.possibleOperations) + for op in graph.possibleOperations.nodeFusions + size += mem(op) + end + for op in graph.possibleOperations.nodeReductions + size += mem(op) + end + for op in graph.possibleOperations.nodeSplits + size += mem(op) + end + + size += Base.summarysize(graph.dirtyNodes, exclude = Union{Node}) + return size += sizeof(diff) +end + +# calculate the size of this operation in Byte +function mem(op::Operation) + return Base.summarysize(op, exclude = Union{Node}) +end + +# calculate the size of this node in Byte +function mem(node::Node) + return Base.summarysize(node, exclude = Union{Node, Operation}) end diff --git a/test/known_graphs.jl b/test/known_graphs.jl index b4cb310..cbbb5a1 100644 --- a/test/known_graphs.jl +++ b/test/known_graphs.jl @@ -1,85 +1,89 @@ using Random -function test_known_graph(name::String, n, fusion_test=true) -@testset "Test $name Graph ($n)" begin - graph = parse_abc(joinpath(@__DIR__, "..", "input", "$name.txt")) - props = graph_properties(graph) +function test_known_graph(name::String, n, fusion_test = true) + @testset "Test $name Graph ($n)" begin + graph = parse_abc(joinpath(@__DIR__, "..", "input", "$name.txt")) + props = graph_properties(graph) - if (fusion_test) - test_node_fusion(graph) + if (fusion_test) + test_node_fusion(graph) + end + test_random_walk(graph, n) end - test_random_walk(graph, n) -end end function test_node_fusion(g::DAG) -@testset "Test Node Fusion" begin - props = graph_properties(g) - - options = get_operations(g) - - nodes_number = length(g.nodes) - data = props.data - compute_effort = props.compute_effort - - while !isempty(options.nodeFusions) - fusion = first(options.nodeFusions) - - @test typeof(fusion) <: NodeFusion - - push_operation!(g, fusion) - + @testset "Test Node Fusion" begin props = graph_properties(g) - @test props.data < data - @test props.compute_effort == compute_effort + + options = get_operations(g) nodes_number = length(g.nodes) data = props.data compute_effort = props.compute_effort - options = get_operations(g) + while !isempty(options.nodeFusions) + fusion = first(options.nodeFusions) + + @test typeof(fusion) <: NodeFusion + + push_operation!(g, fusion) + + props = graph_properties(g) + @test props.data < data + @test props.compute_effort == compute_effort + + nodes_number = length(g.nodes) + data = props.data + compute_effort = props.compute_effort + + options = get_operations(g) + end end end -end function test_random_walk(g::DAG, n::Int64) -@testset "Test Random Walk ($n)" begin - # the purpose here is to do "random" operations and reverse them again and validate that the graph stays the same and doesn't diverge - reset_graph!(g) + @testset "Test Random Walk ($n)" begin + # the purpose here is to do "random" operations and reverse them again and validate that the graph stays the same and doesn't diverge + reset_graph!(g) - properties = graph_properties(g) + @test is_valid(g) - for i = 1:n - # choose push or pop - if rand(Bool) - # push - opt = get_operations(g) + properties = graph_properties(g) - # choose one of fuse/split/reduce - option = rand(1:3) - if option == 1 && !isempty(opt.nodeFusions) - push_operation!(g, rand(collect(opt.nodeFusions))) - elseif option == 2 && !isempty(opt.nodeReductions) - push_operation!(g, rand(collect(opt.nodeReductions))) - elseif option == 3 && !isempty(opt.nodeSplits) - push_operation!(g, rand(collect(opt.nodeSplits))) + for i in 1:n + # choose push or pop + if rand(Bool) + # push + opt = get_operations(g) + + # choose one of fuse/split/reduce + option = rand(1:3) + if option == 1 && !isempty(opt.nodeFusions) + push_operation!(g, rand(collect(opt.nodeFusions))) + elseif option == 2 && !isempty(opt.nodeReductions) + push_operation!(g, rand(collect(opt.nodeReductions))) + elseif option == 3 && !isempty(opt.nodeSplits) + push_operation!(g, rand(collect(opt.nodeSplits))) + else + i = i - 1 + end else - i = i - 1 - end - else - # pop - if (can_pop(g)) - pop_operation!(g) - else - i = i - 1 + # pop + if (can_pop(g)) + pop_operation!(g) + else + i = i - 1 + end end end + + reset_graph!(g) + + @test is_valid(g) + + @test properties == graph_properties(g) end - - reset_graph!(g) - - @test properties == graph_properties(g) -end end Random.seed!(0) diff --git a/test/node_reduction.jl b/test/node_reduction.jl index 646035f..47c6255 100644 --- a/test/node_reduction.jl +++ b/test/node_reduction.jl @@ -51,6 +51,8 @@ import MetagraphOptimization.make_node insert_edge!(graph, BD, B1C_2, false) insert_edge!(graph, CD, C1C, false) + @test is_valid(graph) + @test is_exit_node(d_exit) @test is_entry_node(AD) @test is_entry_node(BD) @@ -75,6 +77,8 @@ import MetagraphOptimization.make_node push_operation!(graph, nr) opt = get_operations(graph) + @test is_valid(graph) + @test length(opt) == (nodeFusions = 4, nodeReductions = 0, nodeSplits = 1) #println("After 2 Node Reductions:\n", opt) @@ -89,5 +93,7 @@ import MetagraphOptimization.make_node opt = get_operations(graph) @test length(opt) == (nodeFusions = 6, nodeReductions = 1, nodeSplits = 1) #println("After reverting to the initial state:\n", opt) + + @test is_valid(graph) end println("Node Reduction Unit Tests Complete!") diff --git a/test/unit_tests_graph.jl b/test/unit_tests_graph.jl index 20dd5fa..32efa11 100644 --- a/test/unit_tests_graph.jl +++ b/test/unit_tests_graph.jl @@ -11,8 +11,10 @@ import MetagraphOptimization.partners @test length(graph.appliedOperations) == 0 @test length(graph.operationsToApply) == 0 @test length(graph.dirtyNodes) == 0 - @test length(graph.diff) == (addedNodes = 0, removedNodes = 0, addedEdges = 0, removedEdges = 0) - @test length(get_operations(graph)) == (nodeFusions = 0, nodeReductions = 0, nodeSplits = 0) + @test length(graph.diff) == + (addedNodes = 0, removedNodes = 0, addedEdges = 0, removedEdges = 0) + @test length(get_operations(graph)) == + (nodeFusions = 0, nodeReductions = 0, nodeSplits = 0) # s to output (exit node) d_exit = insert_node!(graph, make_node(DataTask(10)), false) @@ -105,7 +107,10 @@ import MetagraphOptimization.partners @test length(graph.appliedOperations) == 0 @test length(graph.operationsToApply) == 0 @test length(graph.dirtyNodes) == 26 - @test length(graph.diff) == (addedNodes = 0, removedNodes = 0, addedEdges = 0, removedEdges = 0) + @test length(graph.diff) == + (addedNodes = 0, removedNodes = 0, addedEdges = 0, removedEdges = 0) + + @test is_valid(graph) @test is_entry_node(d_PB) @test is_entry_node(d_PA) @@ -130,7 +135,8 @@ import MetagraphOptimization.partners @test length(siblings(s0)) == 1 operations = get_operations(graph) - @test length(operations) == (nodeFusions = 10, nodeReductions = 0, nodeSplits = 0) + @test length(operations) == + (nodeFusions = 10, nodeReductions = 0, nodeSplits = 0) @test length(graph.dirtyNodes) == 0 @test operations == get_operations(graph) @@ -139,7 +145,7 @@ import MetagraphOptimization.partners properties = graph_properties(graph) @test properties.compute_effort == 134 @test properties.data == 62 - @test properties.compute_intensity ≈ 134/62 + @test properties.compute_intensity ≈ 134 / 62 @test properties.nodes == 26 @test properties.edges == 25 @@ -151,7 +157,8 @@ import MetagraphOptimization.partners @test length(graph.operationsToApply) == 1 @test first(graph.operationsToApply) == nf @test length(graph.dirtyNodes) == 0 - @test length(graph.diff) == (addedNodes = 0, removedNodes = 0, addedEdges = 0, removedEdges = 0) + @test length(graph.diff) == + (addedNodes = 0, removedNodes = 0, addedEdges = 0, removedEdges = 0) # this applies pending operations properties = graph_properties(graph) @@ -164,12 +171,13 @@ import MetagraphOptimization.partners @test properties.edges == 23 @test properties.compute_effort == 134 @test properties.data < 62 - @test properties.compute_intensity > 134/62 + @test properties.compute_intensity > 134 / 62 operations = get_operations(graph) @test length(graph.dirtyNodes) == 0 - @test length(operations) == (nodeFusions = 9, nodeReductions = 0, nodeSplits = 0) + @test length(operations) == + (nodeFusions = 9, nodeReductions = 0, nodeSplits = 0) @test !isempty(operations) possibleNF = 9 @@ -177,12 +185,14 @@ import MetagraphOptimization.partners push_operation!(graph, first(operations.nodeFusions)) operations = get_operations(graph) possibleNF = possibleNF - 1 - @test length(operations) == (nodeFusions = possibleNF, nodeReductions = 0, nodeSplits = 0) + @test length(operations) == + (nodeFusions = possibleNF, nodeReductions = 0, nodeSplits = 0) end @test isempty(operations) - @test length(operations) == (nodeFusions = 0, nodeReductions = 0, nodeSplits = 0) + @test length(operations) == + (nodeFusions = 0, nodeReductions = 0, nodeSplits = 0) @test length(graph.dirtyNodes) == 0 @test length(graph.nodes) == 6 @test length(graph.appliedOperations) == 10 @@ -200,9 +210,12 @@ import MetagraphOptimization.partners @test properties.edges == 25 @test properties.compute_effort == 134 @test properties.data == 62 - @test properties.compute_intensity ≈ 134/62 + @test properties.compute_intensity ≈ 134 / 62 operations = get_operations(graph) - @test length(operations) == (nodeFusions = 10, nodeReductions = 0, nodeSplits = 0) + @test length(operations) == + (nodeFusions = 10, nodeReductions = 0, nodeSplits = 0) + + @test is_valid(graph) end println("Graph Unit Tests Complete!") diff --git a/test/unit_tests_nodes.jl b/test/unit_tests_nodes.jl index 7a274d0..74be0e8 100644 --- a/test/unit_tests_nodes.jl +++ b/test/unit_tests_nodes.jl @@ -3,7 +3,8 @@ nC1 = MetagraphOptimization.make_node(MetagraphOptimization.ComputeTaskU()) nC2 = MetagraphOptimization.make_node(MetagraphOptimization.ComputeTaskV()) nC3 = MetagraphOptimization.make_node(MetagraphOptimization.ComputeTaskP()) - nC4 = MetagraphOptimization.make_node(MetagraphOptimization.ComputeTaskSum()) + nC4 = + MetagraphOptimization.make_node(MetagraphOptimization.ComputeTaskSum()) nD1 = MetagraphOptimization.make_node(MetagraphOptimization.DataTask(10)) nD2 = MetagraphOptimization.make_node(MetagraphOptimization.DataTask(20)) diff --git a/test/unit_tests_utility.jl b/test/unit_tests_utility.jl index db04d80..169023e 100644 --- a/test/unit_tests_utility.jl +++ b/test/unit_tests_utility.jl @@ -5,7 +5,9 @@ @test MetagraphOptimization.bytes_to_human_readable(1025) == "1.001 KiB" @test MetagraphOptimization.bytes_to_human_readable(684235) == "668.2 KiB" @test MetagraphOptimization.bytes_to_human_readable(86214576) == "82.22 MiB" - @test MetagraphOptimization.bytes_to_human_readable(9241457698) == "8.607 GiB" - @test MetagraphOptimization.bytes_to_human_readable(3218598654367) == "2.927 TiB" + @test MetagraphOptimization.bytes_to_human_readable(9241457698) == + "8.607 GiB" + @test MetagraphOptimization.bytes_to_human_readable(3218598654367) == + "2.927 TiB" end println("Utility Unit Tests Complete!")