From 3454370a37b46bee0aceaa463d8dd3bab47d07f1 Mon Sep 17 00:00:00 2001 From: Anton Reinhard Date: Tue, 22 Aug 2023 13:21:26 +0200 Subject: [PATCH] Multithreaded Node Reduction inserttion --- examples/import_bench.jl | 2 +- scripts/bench_threads.fish | 26 +++++++-------- src/MetagraphOptimization.jl | 5 +-- src/abc_model/tasks.jl | 52 +++++++++++++++-------------- src/operations/find.jl | 60 ++++++++++++++++++++++++--------- src/trie.jl | 65 ++++++++++++++++++++++++++++++++++++ src/utility.jl | 46 +------------------------ 7 files changed, 155 insertions(+), 101 deletions(-) create mode 100644 src/trie.jl diff --git a/examples/import_bench.jl b/examples/import_bench.jl index 10cbdfe..c52463d 100644 --- a/examples/import_bench.jl +++ b/examples/import_bench.jl @@ -32,7 +32,7 @@ function import_bench() bench_txt("AB->ABBB.txt") bench_txt("AB->ABBBBB.txt") bench_txt("AB->ABBBBBBB.txt") - #bench_txt("AB->ABBBBBBBBB.txt", false) + #bench_txt("AB->ABBBBBBBBB.txt") bench_txt("ABAB->ABAB.txt") bench_txt("ABAB->ABC.txt") end diff --git a/scripts/bench_threads.fish b/scripts/bench_threads.fish index 738249e..13c5582 100755 --- a/scripts/bench_threads.fish +++ b/scripts/bench_threads.fish @@ -1,23 +1,23 @@ #!/bin/fish set minthreads 1 -set maxthreads 6 +set maxthreads 8 julia --project=./examples -t 4 -e 'import Pkg; Pkg.instantiate()' -for i in $(seq $minthreads $maxthreads) - printf "(AB->AB, $i) " - julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->AB.txt"))' -end +#for i in $(seq $minthreads $maxthreads) +# printf "(AB->AB, $i) " +# julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->AB.txt"))' +#end -for i in $(seq $minthreads $maxthreads) - printf "(AB->ABBB, $i) " - julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->ABBB.txt"))' -end +#for i in $(seq $minthreads $maxthreads) +# printf "(AB->ABBB, $i) " +# julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->ABBB.txt"))' +#end -for i in $(seq $minthreads $maxthreads) - printf "(AB->ABBBBB, $i) " - julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->ABBBBB.txt"))' -end +#for i in $(seq $minthreads $maxthreads) +# printf "(AB->ABBBBB, $i) " +# julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->ABBBBB.txt"))' +#end for i in $(seq $minthreads $maxthreads) printf "(AB->ABBBBBBB, $i) " diff --git a/src/MetagraphOptimization.jl b/src/MetagraphOptimization.jl index 82012a2..bf8e60f 100644 --- a/src/MetagraphOptimization.jl +++ b/src/MetagraphOptimization.jl @@ -25,6 +25,9 @@ include("tasks.jl") include("nodes.jl") include("graph.jl") +include("trie.jl") +include("utility.jl") + include("task_functions.jl") include("node_functions.jl") include("graph_functions.jl") @@ -37,8 +40,6 @@ include("operations/get.jl") include("graph_interface.jl") -include("utility.jl") - include("abc_model/tasks.jl") include("abc_model/task_functions.jl") include("abc_model/parse.jl") diff --git a/src/abc_model/tasks.jl b/src/abc_model/tasks.jl index 028850b..40b095e 100644 --- a/src/abc_model/tasks.jl +++ b/src/abc_model/tasks.jl @@ -1,27 +1,29 @@ struct DataTask <: AbstractDataTask data::UInt64 - end - - # S task with 1 child - struct ComputeTaskS1 <: AbstractComputeTask - end - - # S task with 2 children - struct ComputeTaskS2 <: AbstractComputeTask - end - - # P task with 0 children - struct ComputeTaskP <: AbstractComputeTask - end - - # v task with 2 children - struct ComputeTaskV <: AbstractComputeTask - end - - # u task with 1 child - struct ComputeTaskU <: AbstractComputeTask - end - - # task that sums all its inputs, n children - struct ComputeTaskSum <: AbstractComputeTask - end +end + +# S task with 1 child +struct ComputeTaskS1 <: AbstractComputeTask +end + +# S task with 2 children +struct ComputeTaskS2 <: AbstractComputeTask +end + +# P task with 0 children +struct ComputeTaskP <: AbstractComputeTask +end + +# v task with 2 children +struct ComputeTaskV <: AbstractComputeTask +end + +# u task with 1 child +struct ComputeTaskU <: AbstractComputeTask +end + +# task that sums all its inputs, n children +struct ComputeTaskSum <: AbstractComputeTask +end + +ABC_TASKS = [DataTask, ComputeTaskS1, ComputeTaskS2, ComputeTaskP, ComputeTaskV, ComputeTaskU, ComputeTaskSum] diff --git a/src/operations/find.jl b/src/operations/find.jl index 006aa99..948d73c 100644 --- a/src/operations/find.jl +++ b/src/operations/find.jl @@ -12,6 +12,8 @@ function insert_operation!(operations::PossibleOperations, nf::NodeFusion, locks end function insert_operation!(operations::PossibleOperations, nr::NodeReduction, locks::Dict{Node, SpinLock}) + # since node parents were sorted before, the NodeReductions contain elements in a known order + # this, together with the locking, means that we can safely do the following without inserting duplicates first = true for n in nr.input skip_duplicate = false @@ -34,7 +36,6 @@ function insert_operation!(operations::PossibleOperations, nr::NodeReduction, lo push!(n.operations, nr) unlock(locks[n]) end - push!(operations.nodeReductions, nr) return nothing end @@ -50,12 +51,19 @@ function nr_insertion!(operations::PossibleOperations, nodeReductions::Vector{Ve end sizehint!(operations.nodeReductions, total_len) - @time for vec in nodeReductions + t = @task for vec in nodeReductions + union!(operations.nodeReductions, Set(vec)) + end + schedule(t) + + @threads for vec in nodeReductions for op in vec insert_operation!(operations, op, locks) end end - println(" Time for NR insertion") + + wait(t) + return nothing end @@ -65,14 +73,20 @@ function nf_insertion!(operations::PossibleOperations, nodeFusions::Vector{Vecto total_len += length(vec) end sizehint!(operations.nodeFusions, total_len) - - @time for vec in nodeFusions + + t = @task for vec in nodeFusions union!(operations.nodeFusions, Set(vec)) + end + schedule(t) + + @threads for vec in nodeFusions for op in vec insert_operation!(operations, op, locks) end end - println(" Time for NF insertion") + + wait(t) + return nothing end @@ -83,13 +97,19 @@ function ns_insertion!(operations::PossibleOperations, nodeSplits::Vector{Vector end sizehint!(operations.nodeSplits, total_len) - @time for vec in nodeSplits + t = @task for vec in nodeSplits union!(operations.nodeSplits, Set(vec)) + end + schedule(t) + + @threads for vec in nodeSplits for op in vec insert_operation!(operations, op, locks) end end - println(" Time for NS insertion") + + wait(t) + return nothing end @@ -114,6 +134,8 @@ function generate_options(graph::DAG) sort_node!(node) end + checkedNodes = Set{Node}() + checkedNodesLock = SpinLock() # --- find possible node reductions --- @threads for node in nodeArray # we're looking for nodes with multiple parents, those parents can then potentially reduce with one another @@ -121,12 +143,9 @@ function generate_options(graph::DAG) continue end - candidates = parents(node) - - nodeReductions = Set{Set{Node}}() + candidates = node.parents # sort into equivalence classes - #**TODO** check that only same types can reduce trie = NodeTrie() for candidate in candidates @@ -136,14 +155,25 @@ function generate_options(graph::DAG) nodeReductions = collect(trie) - for nrSet in nodeReductions - push!(generatedReductions[threadid()], NodeReduction(collect(nrSet))) + for nrVec in nodeReductions + # parent sets are ordered and any node can only be part of one nodeReduction, so a NodeReduction is uniquely identifiable by its first element + # this prevents duplicate nodeReductions being generated + lock(checkedNodesLock) + if (nrVec[1] in checkedNodes) + unlock(checkedNodesLock) + continue + else + push!(checkedNodes, nrVec[1]) + end + unlock(checkedNodesLock) + + push!(generatedReductions[threadid()], NodeReduction(nrVec)) end end # launch thread for node reduction insertion - # removeduplicates + # remove duplicates nr_task = @task nr_insertion!(graph.possibleOperations, generatedReductions, locks) schedule(nr_task) diff --git a/src/trie.jl b/src/trie.jl new file mode 100644 index 0000000..7b7944a --- /dev/null +++ b/src/trie.jl @@ -0,0 +1,65 @@ + +# helper struct for NodeTrie +mutable struct NodeIdTrie + value::Vector{Node} + children::Dict{UUID, NodeIdTrie} +end + +# Trie data structure for node reduction, inserts nodes by children +# Assumes that given nodes have ordered vectors of children (see sort_node) +# First level is the task type and thus does not have a value +# Should be constructed with all Types that will be used +mutable struct NodeTrie + children::Dict{DataType, NodeIdTrie} +end + +function NodeTrie() + return NodeTrie(Dict{DataType, NodeIdTrie}()) +end + +function NodeIdTrie() + return NodeIdTrie(Vector{Node}(), Dict{UUID, NodeIdTrie}()) +end + +function insert_helper!(trie::NodeIdTrie, node::Node, depth::Int) + if (length(node.children) == depth) + push!(trie.value, node) + return nothing + end + + depth = depth + 1 + id = node.children[depth].id + + if (!haskey(trie.children, id)) + trie.children[id] = NodeIdTrie() + end + insert_helper!(trie.children[id], node, depth) +end + +function insert!(trie::NodeTrie, node::Node) + t = typeof(node.task) + if (!haskey(trie.children, t)) + trie.children[t] = NodeIdTrie() + end + insert_helper!(trie.children[typeof(node.task)], node, 0) +end + +function collect_helper(trie::NodeIdTrie, acc::Set{Vector{Node}}) + if (length(trie.value) >= 2) + push!(acc, trie.value) + end + + for (id,child) in trie.children + collect_helper(child, acc) + end + return nothing +end + +# returns all sets of multiple nodes that have accumulated in leaves +function collect(trie::NodeTrie) + acc = Set{Vector{Node}}() + for (t,child) in trie.children + collect_helper(child, acc) + end + return acc +end diff --git a/src/utility.jl b/src/utility.jl index 180004b..a163361 100644 --- a/src/utility.jl +++ b/src/utility.jl @@ -8,55 +8,11 @@ function bytes_to_human_readable(bytes::Int64) return string(round(bytes, sigdigits=4), " ", units[unit_index]) end -# Trie data structure for node reduction, inserts nodes by children -# Assumes that given nodes have ordered sets of children (see sort_node) -mutable struct NodeTrie - value::Set{Node} - children::Dict{UUID, NodeTrie} -end - -NodeTrie() = NodeTrie(Set{Node}(), Dict{UUID, NodeTrie}()) - -function insert_helper!(trie::NodeTrie, node::Node, depth::Int) - if (length(node.children) == depth) - push!(trie.value, node) - return nothing - end - - depth = depth + 1 - id = node.children[depth].id - if (!haskey(trie.children, id)) - trie.children[id] = NodeTrie() - end - insert_helper!(trie.children[id], node, depth) -end - -function insert!(trie::NodeTrie, node::Node) - insert_helper!(trie, node, 0) -end - -function collect_helper(trie::NodeTrie, acc::Set{Set{Node}}) - if (length(trie.value) >= 2) - push!(acc, trie.value) - end - - for (id,child) in trie.children - collect_helper(child, acc) - end - return nothing -end - -# returns all sets of multiple nodes that have accumulated in leaves -function collect(trie::NodeTrie) - acc = Set{Set{Node}}() - collect_helper(trie, acc) - return acc -end - function lt_nodes(n1::Node, n2::Node) return n1.id < n2.id end function sort_node!(node::Node) sort!(node.children, lt=lt_nodes) + sort!(node.parents, lt=lt_nodes) end