Start multithreading

This commit is contained in:
Anton Reinhard 2023-08-18 18:44:55 +02:00
parent 9cac6e76be
commit 895e4b2a12
6 changed files with 52 additions and 28 deletions

View File

@ -27,7 +27,7 @@ jobs:
run: julia --project -e 'import Pkg; Pkg.instantiate()'
- name: Run tests
run: julia --project -e 'import Pkg; Pkg.test()'
run: julia --project -t 4 -e 'import Pkg; Pkg.test()'
- name: Run examples
run: julia --project=examples/ -e 'import Pkg; Pkg.develop(Pkg.PackageSpec(path=pwd())); Pkg.instantiate(); include("examples/import_bench.jl")'
run: julia --project=examples/ -t 4 -e 'import Pkg; Pkg.develop(Pkg.PackageSpec(path=pwd())); Pkg.instantiate(); include("examples/import_bench.jl")'

View File

@ -4,6 +4,8 @@ Directed Acyclic Graph optimization for QED
## Usage
For all the julia calls, use `-t n` to give julia `n` threads.
Instantiate the project first:
`julia --project -e 'import Pkg; Pkg.instantiate()'`

View File

@ -427,6 +427,6 @@ function ==(op1::NodeSplit, op2::NodeSplit)
return op1.input == op2.input
end
NodeReduction(input::Vector{Node}) = NodeReduction(input, UUIDs.uuid1(rng))
NodeReduction(input::Vector{Node}) = NodeReduction(input, UUIDs.uuid1(rng[threadid()]))
copy(id::UUID) = UUID(id.value)

View File

@ -1,3 +1,5 @@
using Base.Threads
# outside interface
# applies a new operation to the end of the graph
@ -152,9 +154,9 @@ function node_fusion!(graph::DAG, n1::ComputeTaskNode, n2::DataTaskNode, n3::Com
remove_node!(graph, n3)
# create new node with the fused compute task
new_node = ComputeTaskNode(FusedComputeTask{typeof(n1.task), typeof(n3.task)}())
new_node = ComputeTaskNode(FusedComputeTask{typeof(n1.task),typeof(n3.task)}())
insert_node!(graph, new_node)
# use a set for combined children of n1 and n3 to not get duplicates
n1and3_children = Set{Node}()
@ -255,7 +257,7 @@ function node_split!(graph::DAG, n1::Node)
n_copy = copy(n1)
insert_node!(graph, n_copy)
insert_edge!(graph, make_edge(n_copy, parent))
for child in n1_children
insert_edge!(graph, make_edge(child, n_copy))
end
@ -390,13 +392,15 @@ end
# function to generate all possible optmizations on the graph
function generate_options(graph::DAG)
options = PossibleOperations()
generatedOperations = [Vector{Operation}() for _ in 1:nthreads()]
# make sure the graph is fully generated through
apply_all!(graph)
nodeArray = collect(graph.nodes)
# find possible node fusions
for node in graph.nodes
@threads for node in nodeArray
if (typeof(node) <: DataTaskNode)
if length(node.parents) != 1
# data node can only have a single parent
@ -413,14 +417,11 @@ function generate_options(graph::DAG)
continue
end
nf = NodeFusion((child_node, node, parent_node))
push!(options.nodeFusions, nf)
push!(child_node.operations, nf)
push!(node.operations, nf)
push!(parent_node.operations, nf)
push!(generatedOperations[threadid()], NodeFusion((child_node, node, parent_node)))
end
end
# TODO figure out how to parallelize this
# find possible node reductions
visitedNodes = Set{Node}()
@ -457,24 +458,25 @@ function generate_options(graph::DAG)
end
if reductionVector !== nothing
nr = NodeReduction(reductionVector)
push!(options.nodeReductions, nr)
for node in reductionVector
push!(node.operations, nr)
end
push!(generatedOperations[threadid()], NodeReduction(reductionVector))
end
end
# find possible node splits
for node in graph.nodes
@threads for node in nodeArray
if (can_split(node))
ns = NodeSplit(node)
push!(options.nodeSplits, ns)
push!(node.operations, ns)
push!(generatedOperations[threadid()], NodeSplit(node))
end
end
# TODO figure out how to parallelize this
# insert generated operations from every thread into the final result
for genOps in generatedOperations
for op in genOps
insert_operation!(graph.possibleOperations, op)
end
end
graph.possibleOperations = options
empty!(graph.dirtyNodes)
end
@ -492,3 +494,22 @@ function get_operations(graph::DAG)
return graph.possibleOperations
end
function insert_operation!(operations::PossibleOperations, nf::NodeFusion)
push!(operations.nodeFusions, nf)
push!(nf.input[1].operations, nf)
push!(nf.input[2].operations, nf)
push!(nf.input[3].operations, nf)
end
function insert_operation!(operations::PossibleOperations, nr::NodeReduction)
push!(operations.nodeReductions, nr)
for n in nr.input
push!(n.operations, nr)
end
end
function insert_operation!(operations::PossibleOperations, ns::NodeSplit)
push!(operations.nodeSplits, ns)
push!(ns.input.operations, ns)
end

View File

@ -46,5 +46,5 @@ function ==(n1::DataTaskNode, n2::DataTaskNode)
return n1.id == n2.id
end
copy(n::ComputeTaskNode) = ComputeTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng), copy(n.operations))
copy(n::DataTaskNode) = DataTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng), copy(n.operations))
copy(n::ComputeTaskNode) = ComputeTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng[threadid()]), copy(n.operations))
copy(n::DataTaskNode) = DataTaskNode(copy(n.task), copy(n.parents), copy(n.children), UUIDs.uuid1(rng[threadid()]), copy(n.operations))

View File

@ -1,7 +1,8 @@
using Random
using UUIDs
using Base.Threads
rng = Random.MersenneTwister(0)
rng = [Random.MersenneTwister(0) for _ in 1:nthreads()]
abstract type Node end
@ -33,8 +34,8 @@ struct ComputeTaskNode <: Node
operations::Vector{Operation}
end
DataTaskNode(t::AbstractDataTask) = DataTaskNode(t, Vector{Node}(), Vector{Node}(), UUIDs.uuid1(rng), Vector{Operation}())
ComputeTaskNode(t::AbstractComputeTask) = ComputeTaskNode(t, Vector{Node}(), Vector{Node}(), UUIDs.uuid1(rng), Vector{Operation}())
DataTaskNode(t::AbstractDataTask) = DataTaskNode(t, Vector{Node}(), Vector{Node}(), UUIDs.uuid1(rng[threadid()]), Vector{Operation}())
ComputeTaskNode(t::AbstractComputeTask) = ComputeTaskNode(t, Vector{Node}(), Vector{Node}(), UUIDs.uuid1(rng[threadid()]), Vector{Operation}())
struct Edge
# edge points from child to parent