Multithreaded Node Reduction inserttion

This commit is contained in:
Anton Reinhard 2023-08-22 13:21:26 +02:00
parent 45e35dd526
commit 3454370a37
7 changed files with 155 additions and 101 deletions

@ -32,7 +32,7 @@ function import_bench()
bench_txt("AB->ABBB.txt")
bench_txt("AB->ABBBBB.txt")
bench_txt("AB->ABBBBBBB.txt")
#bench_txt("AB->ABBBBBBBBB.txt", false)
#bench_txt("AB->ABBBBBBBBB.txt")
bench_txt("ABAB->ABAB.txt")
bench_txt("ABAB->ABC.txt")
end

@ -1,23 +1,23 @@
#!/bin/fish
set minthreads 1
set maxthreads 6
set maxthreads 8
julia --project=./examples -t 4 -e 'import Pkg; Pkg.instantiate()'
for i in $(seq $minthreads $maxthreads)
printf "(AB->AB, $i) "
julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->AB.txt"))'
end
#for i in $(seq $minthreads $maxthreads)
# printf "(AB->AB, $i) "
# julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->AB.txt"))'
#end
for i in $(seq $minthreads $maxthreads)
printf "(AB->ABBB, $i) "
julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->ABBB.txt"))'
end
#for i in $(seq $minthreads $maxthreads)
# printf "(AB->ABBB, $i) "
# julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->ABBB.txt"))'
#end
for i in $(seq $minthreads $maxthreads)
printf "(AB->ABBBBB, $i) "
julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->ABBBBB.txt"))'
end
#for i in $(seq $minthreads $maxthreads)
# printf "(AB->ABBBBB, $i) "
# julia --project=./examples -t $i -e 'using MetagraphOptimization; using BenchmarkTools; @btime get_operations(graph) setup=(graph = parse_abc("examples/AB->ABBBBB.txt"))'
#end
for i in $(seq $minthreads $maxthreads)
printf "(AB->ABBBBBBB, $i) "

@ -25,6 +25,9 @@ include("tasks.jl")
include("nodes.jl")
include("graph.jl")
include("trie.jl")
include("utility.jl")
include("task_functions.jl")
include("node_functions.jl")
include("graph_functions.jl")
@ -37,8 +40,6 @@ include("operations/get.jl")
include("graph_interface.jl")
include("utility.jl")
include("abc_model/tasks.jl")
include("abc_model/task_functions.jl")
include("abc_model/parse.jl")

@ -1,27 +1,29 @@
struct DataTask <: AbstractDataTask
data::UInt64
end
# S task with 1 child
struct ComputeTaskS1 <: AbstractComputeTask
end
# S task with 2 children
struct ComputeTaskS2 <: AbstractComputeTask
end
# P task with 0 children
struct ComputeTaskP <: AbstractComputeTask
end
# v task with 2 children
struct ComputeTaskV <: AbstractComputeTask
end
# u task with 1 child
struct ComputeTaskU <: AbstractComputeTask
end
# task that sums all its inputs, n children
struct ComputeTaskSum <: AbstractComputeTask
end
end
# S task with 1 child
struct ComputeTaskS1 <: AbstractComputeTask
end
# S task with 2 children
struct ComputeTaskS2 <: AbstractComputeTask
end
# P task with 0 children
struct ComputeTaskP <: AbstractComputeTask
end
# v task with 2 children
struct ComputeTaskV <: AbstractComputeTask
end
# u task with 1 child
struct ComputeTaskU <: AbstractComputeTask
end
# task that sums all its inputs, n children
struct ComputeTaskSum <: AbstractComputeTask
end
ABC_TASKS = [DataTask, ComputeTaskS1, ComputeTaskS2, ComputeTaskP, ComputeTaskV, ComputeTaskU, ComputeTaskSum]

@ -12,6 +12,8 @@ function insert_operation!(operations::PossibleOperations, nf::NodeFusion, locks
end
function insert_operation!(operations::PossibleOperations, nr::NodeReduction, locks::Dict{Node, SpinLock})
# since node parents were sorted before, the NodeReductions contain elements in a known order
# this, together with the locking, means that we can safely do the following without inserting duplicates
first = true
for n in nr.input
skip_duplicate = false
@ -34,7 +36,6 @@ function insert_operation!(operations::PossibleOperations, nr::NodeReduction, lo
push!(n.operations, nr)
unlock(locks[n])
end
push!(operations.nodeReductions, nr)
return nothing
end
@ -50,12 +51,19 @@ function nr_insertion!(operations::PossibleOperations, nodeReductions::Vector{Ve
end
sizehint!(operations.nodeReductions, total_len)
@time for vec in nodeReductions
t = @task for vec in nodeReductions
union!(operations.nodeReductions, Set(vec))
end
schedule(t)
@threads for vec in nodeReductions
for op in vec
insert_operation!(operations, op, locks)
end
end
println(" Time for NR insertion")
wait(t)
return nothing
end
@ -65,14 +73,20 @@ function nf_insertion!(operations::PossibleOperations, nodeFusions::Vector{Vecto
total_len += length(vec)
end
sizehint!(operations.nodeFusions, total_len)
@time for vec in nodeFusions
t = @task for vec in nodeFusions
union!(operations.nodeFusions, Set(vec))
end
schedule(t)
@threads for vec in nodeFusions
for op in vec
insert_operation!(operations, op, locks)
end
end
println(" Time for NF insertion")
wait(t)
return nothing
end
@ -83,13 +97,19 @@ function ns_insertion!(operations::PossibleOperations, nodeSplits::Vector{Vector
end
sizehint!(operations.nodeSplits, total_len)
@time for vec in nodeSplits
t = @task for vec in nodeSplits
union!(operations.nodeSplits, Set(vec))
end
schedule(t)
@threads for vec in nodeSplits
for op in vec
insert_operation!(operations, op, locks)
end
end
println(" Time for NS insertion")
wait(t)
return nothing
end
@ -114,6 +134,8 @@ function generate_options(graph::DAG)
sort_node!(node)
end
checkedNodes = Set{Node}()
checkedNodesLock = SpinLock()
# --- find possible node reductions ---
@threads for node in nodeArray
# we're looking for nodes with multiple parents, those parents can then potentially reduce with one another
@ -121,12 +143,9 @@ function generate_options(graph::DAG)
continue
end
candidates = parents(node)
nodeReductions = Set{Set{Node}}()
candidates = node.parents
# sort into equivalence classes
#**TODO** check that only same types can reduce
trie = NodeTrie()
for candidate in candidates
@ -136,14 +155,25 @@ function generate_options(graph::DAG)
nodeReductions = collect(trie)
for nrSet in nodeReductions
push!(generatedReductions[threadid()], NodeReduction(collect(nrSet)))
for nrVec in nodeReductions
# parent sets are ordered and any node can only be part of one nodeReduction, so a NodeReduction is uniquely identifiable by its first element
# this prevents duplicate nodeReductions being generated
lock(checkedNodesLock)
if (nrVec[1] in checkedNodes)
unlock(checkedNodesLock)
continue
else
push!(checkedNodes, nrVec[1])
end
unlock(checkedNodesLock)
push!(generatedReductions[threadid()], NodeReduction(nrVec))
end
end
# launch thread for node reduction insertion
# removeduplicates
# remove duplicates
nr_task = @task nr_insertion!(graph.possibleOperations, generatedReductions, locks)
schedule(nr_task)

65
src/trie.jl Normal file

@ -0,0 +1,65 @@
# helper struct for NodeTrie
mutable struct NodeIdTrie
value::Vector{Node}
children::Dict{UUID, NodeIdTrie}
end
# Trie data structure for node reduction, inserts nodes by children
# Assumes that given nodes have ordered vectors of children (see sort_node)
# First level is the task type and thus does not have a value
# Should be constructed with all Types that will be used
mutable struct NodeTrie
children::Dict{DataType, NodeIdTrie}
end
function NodeTrie()
return NodeTrie(Dict{DataType, NodeIdTrie}())
end
function NodeIdTrie()
return NodeIdTrie(Vector{Node}(), Dict{UUID, NodeIdTrie}())
end
function insert_helper!(trie::NodeIdTrie, node::Node, depth::Int)
if (length(node.children) == depth)
push!(trie.value, node)
return nothing
end
depth = depth + 1
id = node.children[depth].id
if (!haskey(trie.children, id))
trie.children[id] = NodeIdTrie()
end
insert_helper!(trie.children[id], node, depth)
end
function insert!(trie::NodeTrie, node::Node)
t = typeof(node.task)
if (!haskey(trie.children, t))
trie.children[t] = NodeIdTrie()
end
insert_helper!(trie.children[typeof(node.task)], node, 0)
end
function collect_helper(trie::NodeIdTrie, acc::Set{Vector{Node}})
if (length(trie.value) >= 2)
push!(acc, trie.value)
end
for (id,child) in trie.children
collect_helper(child, acc)
end
return nothing
end
# returns all sets of multiple nodes that have accumulated in leaves
function collect(trie::NodeTrie)
acc = Set{Vector{Node}}()
for (t,child) in trie.children
collect_helper(child, acc)
end
return acc
end

@ -8,55 +8,11 @@ function bytes_to_human_readable(bytes::Int64)
return string(round(bytes, sigdigits=4), " ", units[unit_index])
end
# Trie data structure for node reduction, inserts nodes by children
# Assumes that given nodes have ordered sets of children (see sort_node)
mutable struct NodeTrie
value::Set{Node}
children::Dict{UUID, NodeTrie}
end
NodeTrie() = NodeTrie(Set{Node}(), Dict{UUID, NodeTrie}())
function insert_helper!(trie::NodeTrie, node::Node, depth::Int)
if (length(node.children) == depth)
push!(trie.value, node)
return nothing
end
depth = depth + 1
id = node.children[depth].id
if (!haskey(trie.children, id))
trie.children[id] = NodeTrie()
end
insert_helper!(trie.children[id], node, depth)
end
function insert!(trie::NodeTrie, node::Node)
insert_helper!(trie, node, 0)
end
function collect_helper(trie::NodeTrie, acc::Set{Set{Node}})
if (length(trie.value) >= 2)
push!(acc, trie.value)
end
for (id,child) in trie.children
collect_helper(child, acc)
end
return nothing
end
# returns all sets of multiple nodes that have accumulated in leaves
function collect(trie::NodeTrie)
acc = Set{Set{Node}}()
collect_helper(trie, acc)
return acc
end
function lt_nodes(n1::Node, n2::Node)
return n1.id < n2.id
end
function sort_node!(node::Node)
sort!(node.children, lt=lt_nodes)
sort!(node.parents, lt=lt_nodes)
end